This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 07feec8af33 Add supervisor config to release task lock after segment
handoff (#18436)
07feec8af33 is described below
commit 07feec8af336cda0332768d23d38e5219067619f
Author: Uddeshya Singh <[email protected]>
AuthorDate: Sat Aug 30 10:27:46 2025 +0530
Add supervisor config to release task lock after segment handoff (#18436)
Changes:
- Add supervisor io config `releaseLocksOnHandoff`
- Add interface `TaskIntervalUnlocker`
- Use `TaskIntervalUnlocker` in `StreamAppenderator` to release a lock if
there are no more overlapping sinks
---
.../embedded/indexing/IngestionSmokeTest.java | 2 +-
.../embedded/indexing/KafkaClusterMetricsTest.java | 2 +-
.../embedded/indexing/KafkaDataFormatsTest.java | 2 +-
.../RabbitStreamIndexTaskTuningConfig.java | 3 +-
.../indexing/kafka/KafkaIndexTaskTuningConfig.java | 15 ++--
.../supervisor/KafkaSupervisorTuningConfig.java | 10 ++-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 1 +
.../kafka/KafkaIndexTaskTuningConfigTest.java | 6 +-
.../simulate/EmbeddedKafkaSupervisorTest.java | 28 ++++++-
.../kafka/supervisor/KafkaSupervisorTest.java | 7 ++
.../TestModifiedKafkaIndexTaskTuningConfig.java | 3 +-
.../kinesis/KinesisIndexTaskTuningConfig.java | 3 +-
.../SeekableStreamAppenderatorConfig.java | 6 ++
.../seekablestream/SeekableStreamIndexTask.java | 6 +-
.../SeekableStreamIndexTaskTuningConfig.java | 17 +++-
.../common/task/TestAppenderatorsManager.java | 7 +-
.../SeekableStreamSupervisorSpecTest.java | 1 +
.../SeekableStreamSupervisorStateTest.java | 1 +
.../realtime/appenderator/AppenderatorConfig.java | 8 ++
.../realtime/appenderator/Appenderators.java | 6 +-
.../appenderator/AppenderatorsManager.java | 3 +-
.../DummyForInjectionAppenderatorsManager.java | 3 +-
.../appenderator/PeonAppenderatorsManager.java | 6 +-
.../realtime/appenderator/StreamAppenderator.java | 33 +++++++-
.../appenderator/TaskIntervalUnlocker.java | 38 +++++++++
.../UnifiedIndexerAppenderatorsManager.java | 12 ++-
.../appenderator/BatchAppenderatorTester.java | 3 +-
.../appenderator/StreamAppenderatorTest.java | 97 ++++++++++++++++++++++
.../appenderator/StreamAppenderatorTester.java | 35 ++++++--
.../appenderator/TestAppenderatorConfig.java | 16 +++-
30 files changed, 339 insertions(+), 41 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index 5b5f4e5083c..af9c3d21c8e 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -348,7 +348,7 @@ public class IngestionSmokeTest extends
EmbeddedClusterTestBase
null, null, null,
1,
null, null, null, null, null, null, null, null, null, null,
- null, null, null, null, null, null, null, null, null, null
+ null, null, null, null, null, null, null, null, null, null, null
);
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index aa3667405bd..2cf03e99ce9 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -449,7 +449,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
null, null, null,
maxRowsPerSegment,
null, null, null, null, null, null, null, null, null, null,
- null, null, null, null, null, null, null, null, null, null
+ null, null, null, null, null, null, null, null, null, null, null
);
}
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
index 4a97ba8811c..c8fff7347d5 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
@@ -643,7 +643,7 @@ public class KafkaDataFormatsTest extends
EmbeddedClusterTestBase
null, null, null,
1,
null, null, null, null, null, null, null, null, null, null,
- null, null, null, null, null, null, null, null, null, null
+ null, null, null, null, null, null, null, null, null, null, null
);
}
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java
index 288e28966a0..dd5bc84d523 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java
@@ -100,7 +100,8 @@ public class RabbitStreamIndexTaskTuningConfig extends
SeekableStreamIndexTaskTu
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads,
- maxColumnsToMerge
+ maxColumnsToMerge,
+ null
);
this.recordBufferSize = recordBufferSize;
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index b0b358da20b..3fa046b63e6 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -53,7 +53,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
@Nullable Integer maxParseExceptions,
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer numPersistThreads,
- @Nullable Integer maxColumnsToMerge
+ @Nullable Integer maxColumnsToMerge,
+ @Nullable Boolean releaseLocksOnHandoff
)
{
super(
@@ -78,7 +79,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads,
- maxColumnsToMerge
+ maxColumnsToMerge,
+ releaseLocksOnHandoff
);
}
@@ -103,7 +105,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
- @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
+ @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
+ @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean
releaseLocksOnHandoff
)
{
this(
@@ -127,7 +130,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads,
- maxColumnsToMerge
+ maxColumnsToMerge,
+ releaseLocksOnHandoff
);
}
@@ -155,7 +159,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getNumPersistThreads(),
- getMaxColumnsToMerge()
+ getMaxColumnsToMerge(),
+ isReleaseLocksOnHandoff()
);
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
index 1e0b3587409..c4a21674d30 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
@@ -66,6 +66,7 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
null,
null,
null,
+ null,
null
);
}
@@ -95,7 +96,8 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
- @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
+ @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
+ @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean
releaseLocksOnHandoff
)
{
super(
@@ -119,7 +121,8 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads,
- maxColumnsToMerge
+ maxColumnsToMerge,
+ releaseLocksOnHandoff
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries :
DEFAULT_CHAT_RETRIES);
@@ -233,7 +236,8 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getNumPersistThreads(),
- getMaxColumnsToMerge()
+ getMaxColumnsToMerge(),
+ isReleaseLocksOnHandoff()
);
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index f736d01bacb..9954de88c26 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2911,6 +2911,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
maxParseExceptions,
maxSavedParseExceptions,
null,
+ null,
null
);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 20777b320a5..132b65a15cb 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -155,7 +155,8 @@ public class KafkaIndexTaskTuningConfigTest
null,
null,
2,
- 5
+ 5,
+ false
);
KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();
@@ -198,7 +199,8 @@ public class KafkaIndexTaskTuningConfigTest
42,
42,
2,
- -1
+ -1,
+ false
);
String serialized = mapper.writeValueAsString(base);
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index bab227dc864..058f3ffa736 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -34,21 +34,26 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
import org.apache.druid.testing.embedded.EmbeddedIndexer;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
public class EmbeddedKafkaSupervisorTest extends EmbeddedClusterTestBase
@@ -56,12 +61,14 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedHistorical historical = new EmbeddedHistorical();
private KafkaResource kafkaServer;
@Override
public EmbeddedDruidCluster createCluster()
{
final EmbeddedDruidCluster cluster =
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
+ indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
kafkaServer = new KafkaResource();
@@ -71,6 +78,7 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
.addServer(new EmbeddedCoordinator())
.addServer(overlord)
.addServer(indexer)
+ .addServer(historical)
.addServer(broker);
return cluster;
@@ -82,8 +90,9 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
final String topic = dataSource;
kafkaServer.createTopicWithPartitions(topic, 2);
+ final int expectedSegments = 10;
kafkaServer.produceRecordsToTopic(
- generateRecordsForTopic(topic, 10, DateTimes.of("2025-06-01"))
+ generateRecordsForTopic(topic, expectedSegments,
DateTimes.of("2025-06-01"))
);
// Submit and start a supervisor
@@ -119,6 +128,21 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
supervisorStatus = cluster.callApi().getSupervisorStatus(supervisorId);
Assertions.assertTrue(supervisorStatus.isSuspended());
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/handoff/count")
+ .hasDimension(DruidMetrics.DATASOURCE,
List.of(dataSource)),
+ agg -> agg.hasSumAtLeast(expectedSegments)
+ );
+ overlord.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("task/action/run/time")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+ .hasDimension(DruidMetrics.TASK_ACTION_TYPE,
"lockRelease"),
+ agg -> agg.hasCountAtLeast(expectedSegments)
+ );
+ List<LockFilterPolicy> lockFilterPolicies = List.of(new
LockFilterPolicy(dataSource, 0, null, null));
+ Map<String, List<Interval>> lockedIntervals = cluster.callApi()
+
.onLeaderOverlord(client -> client.findLockedIntervals(lockFilterPolicies));
+ Assertions.assertEquals(0, lockedIntervals.size());
}
private KafkaSupervisorSpec createKafkaSupervisor(String supervisorId,
String topic)
@@ -154,7 +178,7 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
null, null, null,
1,
null, null, null, null, null, null, null, null, null, null,
- null, null, null, null, null, null, null, null, null, null
+ null, null, null, null, new Period("PT2S"), null, null, null, null,
null, true
);
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index eabacd33a45..9b80d2db41a 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -357,6 +357,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
null
);
@@ -532,6 +533,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
null
),
null
@@ -4672,6 +4674,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
null
)
);
@@ -4712,6 +4715,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
null
);
@@ -4866,6 +4870,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
null
)
);
@@ -5386,6 +5391,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
10,
null,
+ null,
null
);
@@ -5503,6 +5509,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
null
);
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
index a2b7228bb46..08a083fe112 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -81,7 +81,8 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends
KafkaIndexTaskTuning
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads,
- maxColumnsToMerge
+ maxColumnsToMerge,
+ null
);
this.extra = extra;
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index 97d63913bc1..5720ce2ae22 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -112,7 +112,8 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
maxParseExceptions,
maxSavedParseExceptions,
null,
- maxColumnsToMerge
+ maxColumnsToMerge,
+ false
);
this.recordBufferSize = recordBufferSize;
this.recordBufferSizeBytes = recordBufferSizeBytes;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
index f6490579c75..6e4065ea1c9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
@@ -192,6 +192,12 @@ public class SeekableStreamAppenderatorConfig implements
AppenderatorConfig
return maxColumnsToMerge;
}
+ @Override
+ public boolean isReleaseLocksOnHandoff()
+ {
+ return tuningConfig.isReleaseLocksOnHandoff();
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 59dc4c433dc..24f74b8da8b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -32,6 +32,7 @@ import
org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
@@ -228,7 +229,10 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
toolbox.getPolicyEnforcer(),
rowIngestionMeters,
parseExceptionHandler,
- toolbox.getCentralizedTableSchemaConfig()
+ toolbox.getCentralizedTableSchemaConfig(),
+ interval -> {
+ toolbox.getTaskActionClient().submit(new
LockReleaseAction(interval));
+ }
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 124682bcb2e..ea531d77f27 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -41,6 +42,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new
Period("PT10M");
private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT;
private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE;
+ private static final boolean DEFAULT_RELEASE_LOCKS_ON_HANDOFF = false;
private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT =
Duration.ofMinutes(15).toMillis();
private final AppendableIndexSpec appendableIndexSpec;
@@ -68,6 +70,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
private final int numPersistThreads;
private final int maxColumnsToMerge;
+ private final boolean releaseLocksOnHandoff;
public SeekableStreamIndexTaskTuningConfig(
@Nullable AppendableIndexSpec appendableIndexSpec,
@@ -91,7 +94,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
@Nullable Integer maxParseExceptions,
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer numPersistThreads,
- @Nullable Integer maxColumnsToMerge
+ @Nullable Integer maxColumnsToMerge,
+ @Nullable Boolean releaseLocksOnHandoff
)
{
this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
@@ -146,6 +150,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
this.numPersistThreads = Math.max(numPersistThreads,
AppenderatorConfig.DEFAULT_NUM_PERSIST_THREADS);
}
this.maxColumnsToMerge = maxColumnsToMerge == null ?
DEFAULT_MAX_COLUMNS_TO_MERGE : maxColumnsToMerge;
+ this.releaseLocksOnHandoff = Configs.valueOrDefault(releaseLocksOnHandoff,
DEFAULT_RELEASE_LOCKS_ON_HANDOFF);
}
@Override
@@ -293,6 +298,12 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
return maxColumnsToMerge;
}
+ @JsonProperty
+ public boolean isReleaseLocksOnHandoff()
+ {
+ return releaseLocksOnHandoff;
+ }
+
public abstract SeekableStreamIndexTaskTuningConfig
withBasePersistDirectory(File dir);
@Override
@@ -319,6 +330,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
maxSavedParseExceptions == that.maxSavedParseExceptions &&
numPersistThreads == that.numPersistThreads &&
maxColumnsToMerge == that.maxColumnsToMerge &&
+ releaseLocksOnHandoff == that.releaseLocksOnHandoff &&
Objects.equals(partitionsSpec, that.partitionsSpec) &&
Objects.equals(intermediatePersistPeriod,
that.intermediatePersistPeriod) &&
Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
@@ -352,7 +364,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads,
- maxColumnsToMerge
+ maxColumnsToMerge,
+ releaseLocksOnHandoff
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 1a501abba70..e859eb9700d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -45,6 +45,7 @@ import
org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.TaskDirectory;
+import org.apache.druid.segment.realtime.appenderator.TaskIntervalUnlocker;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
@@ -75,7 +76,8 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
PolicyEnforcer policyEnforcer,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ TaskIntervalUnlocker taskIntervalUnlocker
)
{
realtimeAppenderator = Appenderators.createRealtime(
@@ -98,7 +100,8 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
policyEnforcer,
rowIngestionMeters,
parseExceptionHandler,
- centralizedDatasourceSchemaConfig
+ centralizedDatasourceSchemaConfig,
+ taskIntervalUnlocker
);
return realtimeAppenderator;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index a663c7c19aa..f0ba8966626 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -519,6 +519,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
null,
null,
null,
+ null,
null
)
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index feba656ee63..54bf3c84bce 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2837,6 +2837,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
null,
+ null,
null
)
{
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
index 7e5f1b3f25f..a88bc864c94 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
@@ -38,6 +38,14 @@ public interface AppenderatorConfig extends TuningConfig
boolean isSkipBytesInMemoryOverheadCheck();
+ /**
+ * @return true if locks should be released after segments have been handed
off to Historicals.
+ */
+ default boolean isReleaseLocksOnHandoff()
+ {
+ return false;
+ }
+
default int getNumPersistThreads()
{
return DEFAULT_NUM_PERSIST_THREADS;
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index 360ea444c6d..096db844e4d 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -68,7 +68,8 @@ public class Appenderators
PolicyEnforcer policyEnforcer,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ TaskIntervalUnlocker taskIntervalUnlocker
)
{
return new StreamAppenderator(
@@ -99,7 +100,8 @@ public class Appenderators
cache,
rowIngestionMeters,
parseExceptionHandler,
- centralizedDatasourceSchemaConfig
+ centralizedDatasourceSchemaConfig,
+ taskIntervalUnlocker
);
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 97f8c07c78e..dea08ecddbf 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -89,7 +89,8 @@ public interface AppenderatorsManager
PolicyEnforcer policyEnforcer,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ TaskIntervalUnlocker taskIntervalUnlocker
);
/**
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index fcf541aed41..70a820f9886 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -79,7 +79,8 @@ public class DummyForInjectionAppenderatorsManager implements
AppenderatorsManag
PolicyEnforcer policyEnforcer,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ TaskIntervalUnlocker taskIntervalUnlocker
)
{
throw new UOE(ERROR_MSG);
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 1f894bff3cc..518e3f74ad9 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -85,7 +85,8 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
PolicyEnforcer policyEnforcer,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ TaskIntervalUnlocker taskIntervalUnlocker
)
{
if (realtimeAppenderator != null) {
@@ -113,7 +114,8 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
policyEnforcer,
rowIngestionMeters,
parseExceptionHandler,
- centralizedDatasourceSchemaConfig
+ centralizedDatasourceSchemaConfig,
+ taskIntervalUnlocker
);
}
return realtimeAppenderator;
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 0d41f396f0e..8a64225964c 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -202,6 +202,7 @@ public class StreamAppenderator implements Appenderator
private final SegmentLoaderConfig segmentLoaderConfig;
private ScheduledExecutorService exec;
private final FingerprintGenerator fingerprintGenerator;
+ private final TaskIntervalUnlocker taskIntervalUnlocker;
/**
* This constructor allows the caller to provide its own
SinkQuerySegmentWalker.
@@ -227,7 +228,8 @@ public class StreamAppenderator implements Appenderator
Cache cache,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ TaskIntervalUnlocker taskIntervalUnlocker
)
{
this.segmentLoaderConfig = segmentLoaderConfig;
@@ -255,6 +257,7 @@ public class StreamAppenderator implements Appenderator
Execs.makeThreadFactory("StreamAppenderSegmentRemoval-%s")
);
this.fingerprintGenerator = new FingerprintGenerator(objectMapper);
+ this.taskIntervalUnlocker = taskIntervalUnlocker;
}
@VisibleForTesting
@@ -1525,6 +1528,10 @@ public class StreamAppenderator implements Appenderator
removeDirectory(computePersistDir(identifier));
}
+ if (tuningConfig.isReleaseLocksOnHandoff()) {
+ unlockIntervalIfApplicable(sink);
+ }
+
log.info("Dropped segment[%s].", identifier);
};
@@ -1559,6 +1566,30 @@ public class StreamAppenderator implements Appenderator
);
}
+ /**
+ * Unlock the interval if there are more active sinks writing for this
interval.
+ * The interval will be unlocked if there is no other sink writing to any
overlapping intervals.
+ */
+ private void unlockIntervalIfApplicable(Sink abandonedSink)
+ {
+ Interval abandonedInterval = abandonedSink.getInterval();
+ boolean isIntervalActive = sinks.entrySet().stream()
+ .anyMatch(entry -> {
+ return
entry.getValue().getInterval().overlaps(abandonedInterval);
+ });
+ if (isIntervalActive) {
+ log.info("Interval[%s] is still being appended to by other sinks.",
abandonedInterval);
+ } else {
+ log.info("Unlocking interval[%s] as there are no more active sinks for
it.", abandonedInterval);
+ try {
+ taskIntervalUnlocker.releaseLock(abandonedInterval);
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to unlock interval[%s]",
abandonedInterval).emit();
+ }
+ }
+ }
+
private Committed readCommit() throws IOException
{
final File commitFile = computeCommitFile();
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java
new file mode 100644
index 00000000000..00c23a997ff
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.joda.time.Interval;
+
+import java.io.IOException;
+
+/**
+ * Used to release task locks after segments have been handed off, typically
with long-running tasks
+ * to avoid holding locks for longer than necessary. This interface is used
instead of {@code TaskActionClient}
+ * to prevent a cyclic dependency with druid-indexing-service module.
+ */
+@FunctionalInterface
+public interface TaskIntervalUnlocker
+{
+ /**
+ * Releases the lock for the exact interval for a task.
+ */
+ void releaseLock(Interval interval) throws IOException;
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 9a4c20e9a07..9eafd853d3f 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -170,7 +170,8 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
PolicyEnforcer policyEnforcer,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ TaskIntervalUnlocker taskIntervalUnlocker
)
{
synchronized (this) {
@@ -194,7 +195,8 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
cache,
rowIngestionMeters,
parseExceptionHandler,
- centralizedDatasourceSchemaConfig
+ centralizedDatasourceSchemaConfig,
+ taskIntervalUnlocker
)
{
@Override
@@ -507,6 +509,12 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
{
return baseConfig.getNumPersistThreads();
}
+
+ @Override
+ public boolean isReleaseLocksOnHandoff()
+ {
+ return baseConfig.isReleaseLocksOnHandoff();
+ }
}
private IndexMerger wrapIndexMerger(IndexMerger baseMerger)
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
index 19e75a712f3..6ac64dc0570 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
@@ -174,7 +174,8 @@ public class BatchAppenderatorTester implements
AutoCloseable
0L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
- basePersistDirectory == null ? createNewBasePersistDirectory() :
basePersistDirectory
+ basePersistDirectory == null ? createNewBasePersistDirectory() :
basePersistDirectory,
+ null
);
metrics = new SegmentGenerationMetrics();
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index f7e2bb0b57e..22074f4f78f 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -63,6 +63,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
+import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -2449,6 +2450,102 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
}
}
+
+ @Test
+ public void test_dropSegment_unlocksInterval() throws Exception
+ {
+ final List<Interval> unlockedIntervals = Collections.synchronizedList(new
ArrayList<>());
+ final TaskIntervalUnlocker intervalUnlocker = unlockedIntervals::add;
+
+ try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester.Builder()
+ .basePersistDirectory(temporaryFolder.newFolder())
+ .maxRowsInMemory(2)
+ .releaseLocksOnHandoff(true)
+ .taskIntervalUnlocker(intervalUnlocker)
+ .build()) {
+ final Appenderator appenderator = tester.getAppenderator();
+
+ appenderator.startJob();
+
+ final SegmentIdWithShardSpec segmentId1 =
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+ final SegmentIdWithShardSpec segmentId2 =
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
+
+ final InputRow row1 = new MapBasedInputRow(
+ DateTimes.of("2000"),
+ List.of("dim1"),
+ Map.of("dim1", "bar", "met1", 1)
+ );
+
+ final InputRow row2 = new MapBasedInputRow(
+ DateTimes.of("2000-01-01T02:30"),
+ List.of("dim1"),
+ Map.of("dim1", "baz", "met1", 1)
+ );
+
+ appenderator.add(segmentId1, row1,
Suppliers.ofInstance(Committers.nil()), false);
+ appenderator.add(segmentId2, row2,
Suppliers.ofInstance(Committers.nil()), false);
+
+ Assert.assertEquals(2, appenderator.getSegments().size());
+
+ appenderator.drop(segmentId1).get();
+
+ synchronized (unlockedIntervals) {
+ Assert.assertEquals(1, unlockedIntervals.size());
+ Assert.assertEquals(segmentId1.getInterval(),
unlockedIntervals.get(0));
+ }
+
+ Assert.assertEquals(1, appenderator.getSegments().size());
+ Assert.assertTrue(appenderator.getSegments().contains(segmentId2));
+ }
+ }
+
+ @Test
+ public void test_dropSegment_skipsUnlockInterval_ifOverlappingSinkIsActive()
throws Exception
+ {
+ final List<Interval> unlockedIntervals = Collections.synchronizedList(new
ArrayList<>());
+ final TaskIntervalUnlocker intervalUnlocker = unlockedIntervals::add;
+
+ try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester.Builder()
+ .basePersistDirectory(temporaryFolder.newFolder())
+ .maxRowsInMemory(2)
+ .releaseLocksOnHandoff(true)
+ .taskIntervalUnlocker(intervalUnlocker)
+ .build()) {
+ final Appenderator appenderator = tester.getAppenderator();
+
+ appenderator.startJob();
+
+ final SegmentIdWithShardSpec segmentId1 =
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+ final SegmentIdWithShardSpec segmentId2 =
si("2000-01-01T00:30/2000-01-01T01:30", "version2", 0);
+
+ final InputRow row1 = new MapBasedInputRow(
+ DateTimes.of("2000"),
+ List.of("dim1"),
+ Map.of("dim1", "bar", "met1", 1)
+ );
+
+ final InputRow row2 = new MapBasedInputRow(
+ DateTimes.of("2000-01-01T02:30"),
+ List.of("dim1"),
+ Map.of("dim1", "baz", "met1", 1)
+ );
+
+ appenderator.add(segmentId1, row1,
Suppliers.ofInstance(Committers.nil()), false);
+ appenderator.add(segmentId2, row2,
Suppliers.ofInstance(Committers.nil()), false);
+
+ Assert.assertEquals(2, appenderator.getSegments().size());
+
+ appenderator.drop(segmentId1).get();
+
+ synchronized (unlockedIntervals) {
+ Assert.assertEquals(0, unlockedIntervals.size());
+ }
+
+ Assert.assertEquals(1, appenderator.getSegments().size());
+ Assert.assertTrue(appenderator.getSegments().contains(segmentId2));
+ }
+ }
+
private static SegmentIdWithShardSpec si(String interval, String version,
int partitionNum)
{
return new SegmentIdWithShardSpec(
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index bdde04a2a2f..d4d0c97535a 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -113,7 +113,9 @@ public class StreamAppenderatorTester implements
AutoCloseable
final DataSegmentAnnouncer announcer,
final CentralizedDatasourceSchemaConfig
centralizedDatasourceSchemaConfig,
final ServiceEmitter serviceEmitter,
- final PolicyEnforcer policyEnforcer
+ final PolicyEnforcer policyEnforcer,
+ final boolean releaseLocksOnHandoff,
+ final TaskIntervalUnlocker taskIntervalUnlocker
)
{
objectMapper = new DefaultObjectMapper();
@@ -159,7 +161,8 @@ public class StreamAppenderatorTester implements
AutoCloseable
0L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
- basePersistDirectory
+ basePersistDirectory,
+ releaseLocksOnHandoff
);
metrics = new SegmentGenerationMetrics();
@@ -250,7 +253,8 @@ public class StreamAppenderatorTester implements
AutoCloseable
policyEnforcer,
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false,
Integer.MAX_VALUE, 0),
- centralizedDatasourceSchemaConfig
+ centralizedDatasourceSchemaConfig,
+ taskIntervalUnlocker
);
} else {
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
@@ -292,7 +296,8 @@ public class StreamAppenderatorTester implements
AutoCloseable
NoopPolicyEnforcer.instance(),
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false,
Integer.MAX_VALUE, 0),
- centralizedDatasourceSchemaConfig
+ centralizedDatasourceSchemaConfig,
+ taskIntervalUnlocker
);
}
}
@@ -357,6 +362,8 @@ public class StreamAppenderatorTester implements
AutoCloseable
private int delayInMilli = 0;
private ServiceEmitter serviceEmitter;
private PolicyEnforcer policyEnforcer = NoopPolicyEnforcer.instance();
+ private boolean releaseLocksOnHandoff;
+ private TaskIntervalUnlocker taskIntervalUnlocker = interval -> {};
public Builder maxRowsInMemory(final int maxRowsInMemory)
{
@@ -412,6 +419,18 @@ public class StreamAppenderatorTester implements
AutoCloseable
return this;
}
+ public Builder releaseLocksOnHandoff(boolean releaseLocksOnHandoff)
+ {
+ this.releaseLocksOnHandoff = releaseLocksOnHandoff;
+ return this;
+ }
+
+ public Builder taskIntervalUnlocker(TaskIntervalUnlocker
taskIntervalUnlocker)
+ {
+ this.taskIntervalUnlocker = taskIntervalUnlocker;
+ return this;
+ }
+
public StreamAppenderatorTester build()
{
return new StreamAppenderatorTester(
@@ -425,7 +444,9 @@ public class StreamAppenderatorTester implements
AutoCloseable
new NoopDataSegmentAnnouncer(),
CentralizedDatasourceSchemaConfig.create(),
serviceEmitter,
- policyEnforcer
+ policyEnforcer,
+ releaseLocksOnHandoff,
+ taskIntervalUnlocker
);
}
@@ -445,7 +466,9 @@ public class StreamAppenderatorTester implements
AutoCloseable
dataSegmentAnnouncer,
config,
serviceEmitter,
- policyEnforcer
+ policyEnforcer,
+ releaseLocksOnHandoff,
+ taskIntervalUnlocker
);
}
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestAppenderatorConfig.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestAppenderatorConfig.java
index 5dba99de500..58d00b899ae 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestAppenderatorConfig.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestAppenderatorConfig.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.realtime.appenderator;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -45,6 +46,7 @@ public class TestAppenderatorConfig implements
AppenderatorConfig
private final IndexSpec indexSpecForIntermediatePersists;
@Nullable
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
+ private final boolean releaseLocksOnHandoff;
public TestAppenderatorConfig(
AppendableIndexSpec appendableIndexSpec,
@@ -57,7 +59,8 @@ public class TestAppenderatorConfig implements
AppenderatorConfig
Long pushTimeout,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
Integer maxColumnsToMerge,
- File basePersistDirectory
+ File basePersistDirectory,
+ @Nullable Boolean releaseLocksOnHandoff
)
{
this.appendableIndexSpec = appendableIndexSpec;
@@ -74,6 +77,7 @@ public class TestAppenderatorConfig implements
AppenderatorConfig
this.partitionsSpec = null;
this.indexSpecForIntermediatePersists = this.indexSpec;
+ this.releaseLocksOnHandoff = Configs.valueOrDefault(releaseLocksOnHandoff,
false);
}
@Override
@@ -162,6 +166,12 @@ public class TestAppenderatorConfig implements
AppenderatorConfig
return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make
much sense for batch jobs
}
+ @Override
+ public boolean isReleaseLocksOnHandoff()
+ {
+ return releaseLocksOnHandoff;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -180,6 +190,7 @@ public class TestAppenderatorConfig implements
AppenderatorConfig
maxPendingPersists == that.maxPendingPersists &&
reportParseExceptions == that.reportParseExceptions &&
pushTimeout == that.pushTimeout &&
+ releaseLocksOnHandoff == that.releaseLocksOnHandoff &&
Objects.equals(partitionsSpec, that.partitionsSpec) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(indexSpecForIntermediatePersists,
that.indexSpecForIntermediatePersists) &&
@@ -203,7 +214,8 @@ public class TestAppenderatorConfig implements
AppenderatorConfig
maxPendingPersists,
reportParseExceptions,
pushTimeout,
- segmentWriteOutMediumFactory
+ segmentWriteOutMediumFactory,
+ releaseLocksOnHandoff
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]