kfaraz commented on code in PR #18436:
URL: https://github.com/apache/druid/pull/18436#discussion_r2302821097
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java:
##########
@@ -77,4 +82,6 @@ default int getMaxColumnsToMerge()
{
return IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE;
}
+
+
Review Comment:
Nit: extra new lines
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java:
##########
@@ -38,6 +38,11 @@ public interface AppenderatorConfig extends TuningConfig
boolean isSkipBytesInMemoryOverheadCheck();
+ default boolean getReleaseLocksOnHandoff()
Review Comment:
```suggestion
default boolean isReleaseLocksOnHandoff()
```
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1559,6 +1567,22 @@ public Void apply(@Nullable Object input)
);
}
+ private void unlockIntervalIfApplicable(Sink abandonedSink)
+ {
+ Interval abandonedInterval = abandonedSink.getInterval();
+ boolean isIntervalActive = sinks.entrySet().stream()
+ .anyMatch(entry -> {
+ Sink sink = entry.getValue();
+ return !Objects.equals(sink,
abandonedSink)
+ && sink.isWritable()
+ &&
sink.getInterval().overlaps(abandonedInterval);
+ });
+ if (!isIntervalActive) {
+ taskIntervalUnlocker.releaseLock(abandonedInterval);
+ }
+ log.info("implement this.");
Review Comment:
```suggestion
if (isIntervalActive) {
log.info("Interval[%s] is still being appended to by sink[%s]",
abandonedInterval, sinkId);
} else {
log.info("Unlocking interval[%s] as there are no more active sinks
for it.", abandonedInterval);
taskIntervalUnlocker.releaseLock(abandonedInterval);
}
```
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1559,6 +1567,22 @@ public Void apply(@Nullable Object input)
);
}
+ private void unlockIntervalIfApplicable(Sink abandonedSink)
+ {
+ Interval abandonedInterval = abandonedSink.getInterval();
+ boolean isIntervalActive = sinks.entrySet().stream()
+ .anyMatch(entry -> {
+ Sink sink = entry.getValue();
+ return !Objects.equals(sink,
abandonedSink)
+ && sink.isWritable()
Review Comment:
What do these two conditions signify?
Please add a javadoc to this method briefly calling out the cases when the
interval would be unlocked.
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
}
}
+
+ @Test
+ public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
Review Comment:
```suggestion
public void test_dropSegment_unlocksInterval() throws Exception
```
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
}
}
+
+ @Test
+ public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+ {
+ final List<Interval> unlockedIntervals = new ArrayList<>();
+ final TaskIntervalUnlocker mockUnlocker = interval -> {
+ synchronized (unlockedIntervals) {
+ unlockedIntervals.add(interval);
+ }
+ };
+
+ try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester.Builder()
+ .basePersistDirectory(temporaryFolder.newFolder())
+ .maxRowsInMemory(2)
+ .releaseLocksOnHandoff(true)
+ .taskIntervalUnlocker(mockUnlocker)
+ .build()) {
+ final Appenderator appenderator = tester.getAppenderator();
+
+ appenderator.startJob();
+
+ final SegmentIdWithShardSpec identifier1 =
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+ final SegmentIdWithShardSpec identifier2 =
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
+
+ final InputRow row1 = new MapBasedInputRow(
+ DateTimes.of("2000"),
+ ImmutableList.of("dim1"),
+ ImmutableMap.of("dim1", "bar", "met1", 1)
+ );
+
+ final InputRow row2 = new MapBasedInputRow(
+ DateTimes.of("2000-01-01T02:30"),
+ ImmutableList.of("dim1"),
+ ImmutableMap.of("dim1", "baz", "met1", 1)
+ );
+
+ appenderator.add(identifier1, row1,
Suppliers.ofInstance(Committers.nil()), false);
+ appenderator.add(identifier2, row2,
Suppliers.ofInstance(Committers.nil()), false);
+
+ Assert.assertEquals(2, appenderator.getSegments().size());
+
+ synchronized (unlockedIntervals) {
+ unlockedIntervals.clear();
+ }
+
+ appenderator.drop(identifier1).get();
+
+ synchronized (unlockedIntervals) {
+ Assert.assertEquals(1, unlockedIntervals.size());
+ Assert.assertEquals(identifier1.getInterval(),
unlockedIntervals.get(0));
+ }
+
+ Assert.assertEquals(1, appenderator.getSegments().size());
+ Assert.assertTrue(appenderator.getSegments().contains(identifier2));
+ }
+ }
+
+ @Test
+ public void test_abandonSegment_shouldNotUnlockInterval() throws Exception
Review Comment:
```suggestion
public void
test_dropSegment_skipsUnlockInterval_ifOverlappingSinkIsActive() throws
Exception
```
##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java:
##########
@@ -56,28 +59,33 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedHistorical historical = new EmbeddedHistorical();
private KafkaResource kafkaServer;
@Override
public EmbeddedDruidCluster createCluster()
{
final EmbeddedDruidCluster cluster =
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
+ indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+ overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
kafkaServer = new KafkaResource();
cluster.addExtension(KafkaIndexTaskModule.class)
+ .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
Review Comment:
Why is this needed?
##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java:
##########
@@ -56,28 +59,33 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedHistorical historical = new EmbeddedHistorical();
private KafkaResource kafkaServer;
@Override
public EmbeddedDruidCluster createCluster()
{
final EmbeddedDruidCluster cluster =
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
+ indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+ overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
Review Comment:
Overlord does not seem to be polling segments in this test. We can remove
this.
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java:
##########
@@ -174,7 +174,8 @@ public BatchAppenderatorTester(
0L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
- basePersistDirectory == null ? createNewBasePersistDirectory() :
basePersistDirectory
+ basePersistDirectory == null ? createNewBasePersistDirectory() :
basePersistDirectory,
+ false
Review Comment:
Pass null for default value.
##########
services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java:
##########
@@ -374,6 +375,17 @@ public SupervisorStatus getSupervisorStatus(String
supervisorId)
throw new ISE("Could not find supervisor[%s]", supervisorId);
}
+ /**
+ * Fetches the currently locked intervals by tasks.
+ *
+ * @param lockFilterPolicies List of filters for different datasources.
+ * @return Map from datasource name to list of intervals locked by tasks.
+ */
+ public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy>
lockFilterPolicies)
Review Comment:
Nit: I don't think this warrants a new method. This can be inlined in the
one place it is currently being used.
##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java:
##########
@@ -2911,7 +2911,8 @@ private KafkaIndexTask createTask(
maxParseExceptions,
maxSavedParseExceptions,
null,
- null
+ null,
+ false
Review Comment:
Nit: Pass `null` instead so that we fallback to the default, even if it
changes in the future.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java:
##########
@@ -293,6 +298,12 @@ public int getMaxColumnsToMerge()
return maxColumnsToMerge;
}
+ @JsonProperty
+ public boolean getReleaseLockOnHandoff()
Review Comment:
Nit: Primitive boolean getters are named `isXyz()` by convention.
Boxed `Boolean` getters may be named `getXyz()`.
```suggestion
public boolean isReleaseLockOnHandoff()
```
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.joda.time.Interval;
+
+/**
+ * This interface provides a callback mechanism to interact with TaskLockbox
for releasing interval locks when
+ * the segments are handed off. We need this interface to avoid cyclic
dependencues because the
+ * {@code TaskLockbox} is in druid-indexing-service module
+ */
+@FunctionalInterface
+public interface TaskIntervalUnlocker
+{
+ /**
+ * Releases the lock for the given interval.
Review Comment:
We should check the implementation on the Overlord side and call it out here
on whether this releases locks for this exact interval or any lock held by this
Task that _overlaps_ this interval.
##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java:
##########
@@ -119,6 +127,13 @@ public void test_runKafkaSupervisor()
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
supervisorStatus = cluster.callApi().getSupervisorStatus(supervisorId);
Assertions.assertTrue(supervisorStatus.isSuspended());
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/handoff/count")
+ .hasDimension(DruidMetrics.DATASOURCE,
List.of(dataSource)),
+ agg -> agg.hasSumAtLeast(10)
+ );
+ Thread.sleep(5000);
Review Comment:
```suggestion
overlord.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("task/action/run/time")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasDimension(DruidMetrics.TASK_ACTION_TYPE,
"lockRelease")
agg -> agg.hasCountAtLeast(10)
);
```
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
}
}
+
+ @Test
+ public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+ {
+ final List<Interval> unlockedIntervals = new ArrayList<>();
+ final TaskIntervalUnlocker mockUnlocker = interval -> {
+ synchronized (unlockedIntervals) {
+ unlockedIntervals.add(interval);
+ }
+ };
+
+ try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester.Builder()
+ .basePersistDirectory(temporaryFolder.newFolder())
+ .maxRowsInMemory(2)
+ .releaseLocksOnHandoff(true)
+ .taskIntervalUnlocker(mockUnlocker)
+ .build()) {
+ final Appenderator appenderator = tester.getAppenderator();
+
+ appenderator.startJob();
+
+ final SegmentIdWithShardSpec identifier1 =
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+ final SegmentIdWithShardSpec identifier2 =
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
+
+ final InputRow row1 = new MapBasedInputRow(
+ DateTimes.of("2000"),
+ ImmutableList.of("dim1"),
+ ImmutableMap.of("dim1", "bar", "met1", 1)
+ );
+
+ final InputRow row2 = new MapBasedInputRow(
+ DateTimes.of("2000-01-01T02:30"),
+ ImmutableList.of("dim1"),
+ ImmutableMap.of("dim1", "baz", "met1", 1)
+ );
+
+ appenderator.add(identifier1, row1,
Suppliers.ofInstance(Committers.nil()), false);
+ appenderator.add(identifier2, row2,
Suppliers.ofInstance(Committers.nil()), false);
+
+ Assert.assertEquals(2, appenderator.getSegments().size());
+
+ synchronized (unlockedIntervals) {
+ unlockedIntervals.clear();
+ }
Review Comment:
Isn't this a fresh list? Why do we need to clear it?
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java:
##########
@@ -38,6 +38,11 @@ public interface AppenderatorConfig extends TuningConfig
boolean isSkipBytesInMemoryOverheadCheck();
+ default boolean getReleaseLocksOnHandoff()
Review Comment:
Please add a short javadoc.
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
}
}
+
+ @Test
+ public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+ {
+ final List<Interval> unlockedIntervals = new ArrayList<>();
+ final TaskIntervalUnlocker mockUnlocker = interval -> {
Review Comment:
```suggestion
final TaskIntervalUnlocker intervalUnlocker = interval -> {
```
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
}
}
+
+ @Test
+ public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+ {
+ final List<Interval> unlockedIntervals = new ArrayList<>();
+ final TaskIntervalUnlocker mockUnlocker = interval -> {
+ synchronized (unlockedIntervals) {
+ unlockedIntervals.add(interval);
+ }
+ };
+
+ try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester.Builder()
+ .basePersistDirectory(temporaryFolder.newFolder())
+ .maxRowsInMemory(2)
+ .releaseLocksOnHandoff(true)
+ .taskIntervalUnlocker(mockUnlocker)
+ .build()) {
+ final Appenderator appenderator = tester.getAppenderator();
+
+ appenderator.startJob();
+
+ final SegmentIdWithShardSpec identifier1 =
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+ final SegmentIdWithShardSpec identifier2 =
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
Review Comment:
```suggestion
final SegmentIdWithShardSpec segmentId2 =
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
```
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
}
}
+
+ @Test
+ public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+ {
+ final List<Interval> unlockedIntervals = new ArrayList<>();
+ final TaskIntervalUnlocker mockUnlocker = interval -> {
+ synchronized (unlockedIntervals) {
+ unlockedIntervals.add(interval);
+ }
+ };
+
+ try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester.Builder()
+ .basePersistDirectory(temporaryFolder.newFolder())
+ .maxRowsInMemory(2)
+ .releaseLocksOnHandoff(true)
+ .taskIntervalUnlocker(mockUnlocker)
+ .build()) {
+ final Appenderator appenderator = tester.getAppenderator();
+
+ appenderator.startJob();
+
+ final SegmentIdWithShardSpec identifier1 =
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
Review Comment:
```suggestion
final SegmentIdWithShardSpec segmentId1 =
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
```
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
}
}
+
+ @Test
+ public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+ {
+ final List<Interval> unlockedIntervals = new ArrayList<>();
+ final TaskIntervalUnlocker mockUnlocker = interval -> {
+ synchronized (unlockedIntervals) {
+ unlockedIntervals.add(interval);
+ }
+ };
+
+ try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester.Builder()
+ .basePersistDirectory(temporaryFolder.newFolder())
+ .maxRowsInMemory(2)
+ .releaseLocksOnHandoff(true)
+ .taskIntervalUnlocker(mockUnlocker)
+ .build()) {
+ final Appenderator appenderator = tester.getAppenderator();
+
+ appenderator.startJob();
+
+ final SegmentIdWithShardSpec identifier1 =
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+ final SegmentIdWithShardSpec identifier2 =
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
+
+ final InputRow row1 = new MapBasedInputRow(
+ DateTimes.of("2000"),
+ ImmutableList.of("dim1"),
+ ImmutableMap.of("dim1", "bar", "met1", 1)
+ );
+
+ final InputRow row2 = new MapBasedInputRow(
+ DateTimes.of("2000-01-01T02:30"),
+ ImmutableList.of("dim1"),
+ ImmutableMap.of("dim1", "baz", "met1", 1)
+ );
+
+ appenderator.add(identifier1, row1,
Suppliers.ofInstance(Committers.nil()), false);
+ appenderator.add(identifier2, row2,
Suppliers.ofInstance(Committers.nil()), false);
+
+ Assert.assertEquals(2, appenderator.getSegments().size());
+
+ synchronized (unlockedIntervals) {
+ unlockedIntervals.clear();
+ }
+
+ appenderator.drop(identifier1).get();
+
+ synchronized (unlockedIntervals) {
Review Comment:
You can avoid the `synchronized` by using a thread-safe collection like some
concurrent map or set.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.joda.time.Interval;
+
+/**
+ * This interface provides a callback mechanism to interact with TaskLockbox
for releasing interval locks when
+ * the segments are handed off. We need this interface to avoid cyclic
dependencues because the
+ * {@code TaskLockbox} is in druid-indexing-service module
+ */
+@FunctionalInterface
+public interface TaskIntervalUnlocker
+{
+ /**
+ * Releases the lock for the given interval.
+ *
+ * @param interval interval for which the lock needs to be released
Review Comment:
We can omit this as it doesn't add any new info.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1559,6 +1567,22 @@ public Void apply(@Nullable Object input)
);
}
+ private void unlockIntervalIfApplicable(Sink abandonedSink)
+ {
+ Interval abandonedInterval = abandonedSink.getInterval();
+ boolean isIntervalActive = sinks.entrySet().stream()
+ .anyMatch(entry -> {
+ Sink sink = entry.getValue();
+ return !Objects.equals(sink,
abandonedSink)
+ && sink.isWritable()
+ &&
sink.getInterval().overlaps(abandonedInterval);
+ });
+ if (!isIntervalActive) {
+ taskIntervalUnlocker.releaseLock(abandonedInterval);
+ }
+ log.info("implement this.");
Review Comment:
Also, given that this is a new feature, I am not sure if we should throw an
exception if the unlock fails.
Can you please check if throwing an exception here will cause the task
itself to fail or any other side effect?
Let's do the following instead:
- Add `throws IOException` to `TaskIntervalLocker.releaseLock()`
- Put the actual call to `taskIntervalUnlocker.releaseLock()` inside
try/catch
- Emit an alert if unlock fails
In a follow up PR:
Let's also discuss the possibility of some clean up mechanism that gets rid
of orphan locks.
Maybe the Overlord could have some kind of duty to perform automated clean
up of orphan locks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]