This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a5e9b14be0e Add delay before the peon drops the segments after
publishing them (#15373)
a5e9b14be0e is described below
commit a5e9b14be0e4968358f1153d673c7c9a53ad6c70
Author: kaisun2000 <[email protected]>
AuthorDate: Mon Jan 1 21:38:28 2024 -0800
Add delay before the peon drops the segments after publishing them (#15373)
Currently in the realtime ingestion (Kafka/Kinesis) case, after publishing
the segments, upon acknowledgement from the coordinator that the segments are
already placed in some historicals, the peon would unannounce the segments
(basically saying the segments are not in this peon anymore to the whole
cluster) and drop the segments from cache and sink timeline in one shot.
The in transit queries from the brokers that still thinks the segments are
in the peon can get a NullPointer exception when the peon is unsetting the
hydrants in the sinks.
The fix would let the peon to wait for a configurable delay period before
dropping segments, remove segments from cache etc after the peon unannounce the
segments.
This delayed approach is similar to how the historicals handle segments
moving out.
---
.../apache/druid/indexing/common/TaskToolbox.java | 18 +++
.../druid/indexing/common/TaskToolboxFactory.java | 5 +
.../task/AppenderatorDriverRealtimeIndexTask.java | 1 +
.../seekablestream/SeekableStreamIndexTask.java | 1 +
.../druid/indexing/common/TaskToolboxTest.java | 9 ++
.../AppenderatorDriverRealtimeIndexTaskTest.java | 1 +
.../common/task/RealtimeIndexTaskTest.java | 1 +
.../common/task/TestAppenderatorsManager.java | 3 +
.../overlord/SingleTaskBackgroundRunnerTest.java | 1 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 1 +
.../indexing/overlord/TestTaskToolboxFactory.java | 1 +
.../SeekableStreamIndexTaskTestBase.java | 1 +
.../indexing/worker/WorkerTaskManagerTest.java | 1 +
.../indexing/worker/WorkerTaskMonitorTest.java | 1 +
.../realtime/appenderator/Appenderators.java | 3 +
.../appenderator/AppenderatorsManager.java | 2 +
.../DefaultRealtimeAppenderatorFactory.java | 1 +
.../DummyForInjectionAppenderatorsManager.java | 2 +
.../appenderator/PeonAppenderatorsManager.java | 3 +
.../realtime/appenderator/StreamAppenderator.java | 80 +++++++++---
.../UnifiedIndexerAppenderatorsManager.java | 3 +
.../appenderator/StreamAppenderatorTest.java | 98 +++++++++++++++
.../appenderator/StreamAppenderatorTester.java | 134 +++++++++++++++------
23 files changed, 319 insertions(+), 52 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 9ebfb96b567..3e1ac720bb9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -55,6 +55,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -78,6 +79,7 @@ import java.util.Collection;
*/
public class TaskToolbox
{
+ private final SegmentLoaderConfig segmentLoaderConfig;
private final TaskConfig config;
private final DruidNode taskExecutorNode;
private final TaskActionClient taskActionClient;
@@ -130,6 +132,7 @@ public class TaskToolbox
private final String attemptId;
public TaskToolbox(
+ SegmentLoaderConfig segmentLoaderConfig,
TaskConfig config,
DruidNode taskExecutorNode,
TaskActionClient taskActionClient,
@@ -171,6 +174,7 @@ public class TaskToolbox
String attemptId
)
{
+ this.segmentLoaderConfig = segmentLoaderConfig;
this.config = config;
this.taskExecutorNode = taskExecutorNode;
this.taskActionClient = taskActionClient;
@@ -213,6 +217,11 @@ public class TaskToolbox
this.attemptId = attemptId;
}
+ public SegmentLoaderConfig getSegmentLoaderConfig()
+ {
+ return segmentLoaderConfig;
+ }
+
public TaskConfig getConfig()
{
return config;
@@ -504,6 +513,7 @@ public class TaskToolbox
public static class Builder
{
+ private SegmentLoaderConfig segmentLoaderConfig;
private TaskConfig config;
private DruidNode taskExecutorNode;
private TaskActionClient taskActionClient;
@@ -550,6 +560,7 @@ public class TaskToolbox
public Builder(TaskToolbox other)
{
+ this.segmentLoaderConfig = other.segmentLoaderConfig;
this.config = other.config;
this.taskExecutorNode = other.taskExecutorNode;
this.taskActionClient = other.taskActionClient;
@@ -589,6 +600,12 @@ public class TaskToolbox
this.shuffleClient = other.shuffleClient;
}
+ public Builder config(final SegmentLoaderConfig segmentLoaderConfig)
+ {
+ this.segmentLoaderConfig = segmentLoaderConfig;
+ return this;
+ }
+
public Builder config(final TaskConfig config)
{
this.config = config;
@@ -826,6 +843,7 @@ public class TaskToolbox
public TaskToolbox build()
{
return new TaskToolbox(
+ segmentLoaderConfig,
config,
taskExecutorNode,
taskActionClient,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 288d89919b9..f2df3ddc3a3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -56,6 +56,7 @@ import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
@@ -72,6 +73,7 @@ import java.util.function.Function;
*/
public class TaskToolboxFactory
{
+ private final SegmentLoaderConfig segmentLoaderConfig;
private final TaskConfig config;
private final DruidNode taskExecutorNode;
private final TaskActionClientFactory taskActionClientFactory;
@@ -115,6 +117,7 @@ public class TaskToolboxFactory
@Inject
public TaskToolboxFactory(
+ SegmentLoaderConfig segmentLoadConfig,
TaskConfig config,
@Parent DruidNode taskExecutorNode,
TaskActionClientFactory taskActionClientFactory,
@@ -155,6 +158,7 @@ public class TaskToolboxFactory
@AttemptId String attemptId
)
{
+ this.segmentLoaderConfig = segmentLoadConfig;
this.config = config;
this.taskExecutorNode = taskExecutorNode;
this.taskActionClientFactory = taskActionClientFactory;
@@ -210,6 +214,7 @@ public class TaskToolboxFactory
final File taskWorkDir = config.getTaskWorkDir(task.getId());
return new TaskToolbox.Builder()
.config(config)
+ .config(segmentLoaderConfig)
.taskExecutorNode(taskExecutorNode)
.taskActionClient(taskActionClientFactory.create(task))
.emitter(emitter)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 3a599dd485b..9e8817fc5cc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -775,6 +775,7 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
)
{
return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
+ null,
getId(),
dataSchema,
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index d74ee5c0be2..c881b3814e3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -187,6 +187,7 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
)
{
return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
+ toolbox.getSegmentLoaderConfig(),
getId(),
dataSchema,
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index c1f7a549d65..75ac2eeb670 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -48,6 +48,7 @@ import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
@@ -94,6 +95,7 @@ public class TaskToolboxTest
private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
private Cache mockCache = EasyMock.createMock(Cache.class);
private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class);
+ private SegmentLoaderConfig segmentLoaderConfig =
EasyMock.createMock(SegmentLoaderConfig.class);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -115,6 +117,7 @@ public class TaskToolboxTest
.build();
taskToolbox = new TaskToolboxFactory(
+ segmentLoaderConfig,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null,
true, false),
mockTaskActionClientFactory,
@@ -162,6 +165,12 @@ public class TaskToolboxTest
Assert.assertEquals(mockDataSegmentArchiver,
taskToolbox.build(task).getDataSegmentArchiver());
}
+ @Test
+ public void testGetSegmentLoaderConfig()
+ {
+ Assert.assertEquals(segmentLoaderConfig,
taskToolbox.build(task).getSegmentLoaderConfig());
+ }
+
@Test
public void testGetSegmentAnnouncer()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 5e088724f89..5b400a65111 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1606,6 +1606,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
};
final TestUtils testUtils = new TestUtils();
taskToolboxFactory = new TaskToolboxFactory(
+ null,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null,
true, false),
taskActionClientFactory,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 9881561d61f..e38f59d6d7a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -979,6 +979,7 @@ public class RealtimeIndexTaskTest extends
InitializedNullHandlingTest
};
final TestUtils testUtils = new TestUtils();
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
+ null,
taskConfig,
null, // taskExecutorNode
taskActionClientFactory,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index c4d4364f434..24e7797602e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -36,6 +36,7 @@ import
org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@@ -50,6 +51,7 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
@Override
public Appenderator createRealtimeAppenderatorForTask(
+ SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
@@ -72,6 +74,7 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
)
{
realtimeAppenderator = Appenderators.createRealtime(
+ segmentLoaderConfig,
taskId,
schema,
config,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 087ae3e1fc1..3e9d776f8c8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -97,6 +97,7 @@ public class SingleTaskBackgroundRunnerTest
final ServiceEmitter emitter = new NoopServiceEmitter();
EmittingLogger.registerEmitter(emitter);
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
+ null,
taskConfig,
null,
EasyMock.createMock(TaskActionClientFactory.class),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 627c161863b..f8809bb5052 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -619,6 +619,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
.build();
return new TaskToolboxFactory(
+ null,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null,
true, false),
tac,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
index 5c6afdbb61b..33dc249fb41 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
@@ -77,6 +77,7 @@ public class TestTaskToolboxFactory extends TaskToolboxFactory
)
{
super(
+ null,
bob.config,
bob.taskExecutorNode,
bob.taskActionClientFactory,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 6be23407a41..2a1a8ac0b08 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -662,6 +662,7 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
final DataSegmentPusher dataSegmentPusher = new
LocalDataSegmentPusher(dataSegmentPusherConfig);
toolboxFactory = new TaskToolboxFactory(
+ null,
taskConfig,
null, // taskExecutorNode
taskActionClientFactory,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 93c5635492d..fb1eba7644b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -128,6 +128,7 @@ public class WorkerTaskManagerTest
jsonMapper,
new TestTaskRunner(
new TaskToolboxFactory(
+ null,
taskConfig,
null,
taskActionClientFactory,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index aecfe29ab1f..94d70545803 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -170,6 +170,7 @@ public class WorkerTaskMonitorTest
jsonMapper,
new SingleTaskBackgroundRunner(
new TaskToolboxFactory(
+ null,
taskConfig,
null,
taskActionClientFactory,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index b087c91988c..974f4a9773e 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -35,6 +35,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
@@ -43,6 +44,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
public class Appenderators
{
public static Appenderator createRealtime(
+ SegmentLoaderConfig segmentLoaderConfig,
String id,
DataSchema schema,
AppenderatorConfig config,
@@ -65,6 +67,7 @@ public class Appenderators
)
{
return new StreamAppenderator(
+ segmentLoaderConfig,
id,
schema,
config,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 75e078e1bb9..7d76f14c4c0 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -36,6 +36,7 @@ import
org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
@@ -64,6 +65,7 @@ public interface AppenderatorsManager
* used for query processing.
*/
Appenderator createRealtimeAppenderatorForTask(
+ SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
index 7698d41c8a0..4f5c5ed5b75 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
@@ -96,6 +96,7 @@ public class DefaultRealtimeAppenderatorFactory implements
AppenderatorFactory
{
final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
return Appenderators.createRealtime(
+ null,
schema.getDataSource(),
schema,
config.withBasePersistDirectory(
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index 10939cf5356..281f053fecb 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -37,6 +37,7 @@ import
org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
@@ -55,6 +56,7 @@ public class DummyForInjectionAppenderatorsManager implements
AppenderatorsManag
@Override
public Appenderator createRealtimeAppenderatorForTask(
+ SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 82df08c665a..2370eb98d01 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -37,6 +37,7 @@ import
org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
@@ -61,6 +62,7 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
@Override
public Appenderator createRealtimeAppenderatorForTask(
+ SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
@@ -88,6 +90,7 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
throw new ISE("A batch appenderator was already created for this peon's
task.");
} else {
realtimeAppenderator = Appenderators.createRealtime(
+ segmentLoaderConfig,
taskId,
schema,
config,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index f21f67ed504..83e4f990709 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -67,6 +67,7 @@ import
org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
@@ -95,6 +96,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -170,6 +173,9 @@ public class StreamAppenderator implements Appenderator
private volatile Throwable persistError;
+ private final SegmentLoaderConfig segmentLoaderConfig;
+ private ScheduledExecutorService exec;
+
/**
* This constructor allows the caller to provide its own
SinkQuerySegmentWalker.
*
@@ -180,6 +186,7 @@ public class StreamAppenderator implements Appenderator
* Appenderators.
*/
StreamAppenderator(
+ SegmentLoaderConfig segmentLoaderConfig,
String id,
DataSchema schema,
AppenderatorConfig tuningConfig,
@@ -196,6 +203,7 @@ public class StreamAppenderator implements Appenderator
boolean useMaxMemoryEstimates
)
{
+ this.segmentLoaderConfig = segmentLoaderConfig;
this.myId = id;
this.schema = Preconditions.checkNotNull(schema, "schema");
this.tuningConfig = Preconditions.checkNotNull(tuningConfig,
"tuningConfig");
@@ -221,6 +229,20 @@ public class StreamAppenderator implements Appenderator
maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
skipBytesInMemoryOverheadCheck =
tuningConfig.isSkipBytesInMemoryOverheadCheck();
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
+
+ this.exec = Executors.newScheduledThreadPool(
+ 1,
+ Execs.makeThreadFactory("StreamAppenderSegmentRemoval-%s")
+ );
+ }
+
+ @VisibleForTesting
+ void setExec(ScheduledExecutorService testExec)
+ {
+ if (exec != null) {
+ exec.shutdown();
+ }
+ exec = testExec;
}
@Override
@@ -1170,6 +1192,10 @@ public class StreamAppenderator implements Appenderator
if (intermediateTempExecutor != null) {
intermediateTempExecutor.shutdownNow();
}
+
+ if (exec != null) {
+ exec.shutdownNow();
+ }
}
private void resetNextFlush()
@@ -1400,24 +1426,48 @@ public class StreamAppenderator implements Appenderator
.emit();
}
- droppingSinks.remove(identifier);
- sinkTimeline.remove(
- sink.getInterval(),
- sink.getVersion(),
- identifier.getShardSpec().createChunk(sink)
- );
- for (FireHydrant hydrant : sink) {
- if (cache != null) {
-
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
+ Runnable removeRunnable = () -> {
+ droppingSinks.remove(identifier);
+ sinkTimeline.remove(
+ sink.getInterval(),
+ sink.getVersion(),
+ identifier.getShardSpec().createChunk(sink)
+ );
+ for (FireHydrant hydrant : sink) {
+ if (cache != null) {
+
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
+ }
+ hydrant.swapSegment(null);
}
- hydrant.swapSegment(null);
- }
- if (removeOnDiskData) {
- removeDirectory(computePersistDir(identifier));
- }
+ if (removeOnDiskData) {
+ removeDirectory(computePersistDir(identifier));
+ }
+
+ log.info("Dropped segment[%s].", identifier);
+ };
- log.info("Dropped segment[%s].", identifier);
+ if (segmentLoaderConfig == null) {
+ log.info(
+ "Unannounced segment[%s]",
+ identifier
+ );
+ removeRunnable.run();
+ } else {
+ log.info(
+ "Unannounced segment[%s], scheduling drop in [%d] millisecs",
+ identifier,
+ segmentLoaderConfig.getDropSegmentDelayMillis()
+ );
+ // Keep the segments in the cache and sinkTimeline for
dropSegmentDelay after unannouncing the segments
+ // This way, in transit queries which still see the segments in
this peon would be able to query the
+ // segments and not throw NullPtr exceptions.
+ exec.schedule(
+ removeRunnable,
+ segmentLoaderConfig.getDropSegmentDelayMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ }
return null;
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 0a728eb890c..b9be326c822 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -62,6 +62,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -149,6 +150,7 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
@Override
public Appenderator createRealtimeAppenderatorForTask(
+ SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
@@ -177,6 +179,7 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
);
Appenderator appenderator = new StreamAppenderator(
+ null,
taskId,
schema,
rewriteAppenderatorConfigMemoryLimits(config),
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index 2e05cb9053f..bf3458b0975 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -61,6 +61,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class StreamAppenderatorTest extends InitializedNullHandlingTest
@@ -950,6 +953,101 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
}
}
+ @Test
+ public void testDelayedDrop() throws Exception
+ {
+ class TestScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor
+ {
+ ScheduledFuture<?> scheduledFuture;
+
+ public TestScheduledThreadPoolExecutor()
+ {
+ super(1);
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(
+ Runnable command,
+ long delay, TimeUnit unit
+ )
+ {
+ ScheduledFuture<?> future = super.schedule(command, delay, unit);
+ scheduledFuture = future;
+ return future;
+ }
+
+ ScheduledFuture<?> getLastScheduledFuture()
+ {
+ return scheduledFuture;
+ }
+ }
+
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+
.withSegmentDropDelayInMilli(1000)
+ .build()) {
+ final Appenderator appenderator = tester.getAppenderator();
+ TestScheduledThreadPoolExecutor testExec = new
TestScheduledThreadPoolExecutor();
+ ((StreamAppenderator) appenderator).setExec(testExec);
+
+ appenderator.startJob();
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
Suppliers.ofInstance(Committers.nil()));
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2),
Suppliers.ofInstance(Committers.nil()));
+ appenderator.add(IDENTIFIERS.get(1), ir("2000", "foo", 4),
Suppliers.ofInstance(Committers.nil()));
+ appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8),
Suppliers.ofInstance(Committers.nil()));
+ appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16),
Suppliers.ofInstance(Committers.nil()));
+ appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32),
Suppliers.ofInstance(Committers.nil()));
+ appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64),
Suppliers.ofInstance(Committers.nil()));
+
+ // Query1: 2000/2001
+ final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
+
.dataSource(StreamAppenderatorTester.DATASOURCE)
+
.intervals(ImmutableList.of(Intervals.of("2000/2001")))
+ .aggregators(
+ Arrays.asList(
+ new
LongSumAggregatorFactory("count", "count"),
+ new
LongSumAggregatorFactory("met", "met")
+ )
+ )
+ .granularity(Granularities.DAY)
+ .build();
+
+ appenderator.drop(IDENTIFIERS.get(0)).get();
+
+ // segment 0 won't be dropped immediately
+ final List<Result<TimeseriesResultValue>> results1 =
+ QueryPlus.wrap(query1).run(appenderator,
ResponseContext.createEmpty()).toList();
+ Assert.assertEquals(
+ "query1",
+ ImmutableList.of(
+ new Result<>(
+ DateTimes.of("2000"),
+ new TimeseriesResultValue(ImmutableMap.of("count", 3L,
"met", 7L))
+ )
+ ),
+ results1
+ );
+
+ // segment 0 would eventually be dropped at some time after 1 secs drop
delay
+ testExec.getLastScheduledFuture().get(5000, TimeUnit.MILLISECONDS);
+
+ final List<Result<TimeseriesResultValue>> results =
QueryPlus.wrap(query1)
+
.run(appenderator, ResponseContext.createEmpty())
+ .toList();
+ List<Result<TimeseriesResultValue>> expectedResults =
+ ImmutableList.of(
+ new Result<>(
+ DateTimes.of("2000"),
+ new TimeseriesResultValue(ImmutableMap.of("count", 1L,
"met", 4L))
+ )
+ );
+ Assert.assertEquals("query after dropped", expectedResults, results);
+ }
+ }
+
@Test
public void testQueryByIntervals() throws Exception
{
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index 217c90116c3..3663af38b01 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -63,6 +63,7 @@ import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
@@ -93,6 +94,7 @@ public class StreamAppenderatorTester implements AutoCloseable
private final List<DataSegment> pushedSegments = new
CopyOnWriteArrayList<>();
public StreamAppenderatorTester(
+ final int delayInMilli,
final int maxRowsInMemory,
final long maxSizeInBytes,
final File basePersistDirectory,
@@ -209,43 +211,93 @@ public class StreamAppenderatorTester implements
AutoCloseable
throw new UnsupportedOperationException();
}
};
- appenderator = Appenderators.createRealtime(
- schema.getDataSource(),
- schema,
- tuningConfig,
- metrics,
- dataSegmentPusher,
- objectMapper,
- indexIO,
- indexMerger,
- new DefaultQueryRunnerFactoryConglomerate(
- ImmutableMap.of(
- TimeseriesQuery.class, new TimeseriesQueryRunnerFactory(
- new TimeseriesQueryQueryToolChest(),
- new TimeseriesQueryEngine(),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER
- ),
- ScanQuery.class, new ScanQueryRunnerFactory(
- new ScanQueryQueryToolChest(
- new ScanQueryConfig(),
- new DefaultGenericQueryMetricsFactory()
- ),
- new ScanQueryEngine(),
- new ScanQueryConfig()
- )
- )
- ),
- new NoopDataSegmentAnnouncer(),
- emitter,
- new ForwardingQueryProcessingPool(queryExecutor),
- NoopJoinableFactory.INSTANCE,
- MapCache.create(2048),
- new CacheConfig(),
- new CachePopulatorStats(),
- rowIngestionMeters,
- new ParseExceptionHandler(rowIngestionMeters, false,
Integer.MAX_VALUE, 0),
- true
- );
+ if (delayInMilli <= 0) {
+ appenderator = Appenderators.createRealtime(
+ null,
+ schema.getDataSource(),
+ schema,
+ tuningConfig,
+ metrics,
+ dataSegmentPusher,
+ objectMapper,
+ indexIO,
+ indexMerger,
+ new DefaultQueryRunnerFactoryConglomerate(
+ ImmutableMap.of(
+ TimeseriesQuery.class, new TimeseriesQueryRunnerFactory(
+ new TimeseriesQueryQueryToolChest(),
+ new TimeseriesQueryEngine(),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ ),
+ ScanQuery.class, new ScanQueryRunnerFactory(
+ new ScanQueryQueryToolChest(
+ new ScanQueryConfig(),
+ new DefaultGenericQueryMetricsFactory()
+ ),
+ new ScanQueryEngine(),
+ new ScanQueryConfig()
+ )
+ )
+ ),
+ new NoopDataSegmentAnnouncer(),
+ emitter,
+ new ForwardingQueryProcessingPool(queryExecutor),
+ NoopJoinableFactory.INSTANCE,
+ MapCache.create(2048),
+ new CacheConfig(),
+ new CachePopulatorStats(),
+ rowIngestionMeters,
+ new ParseExceptionHandler(rowIngestionMeters, false,
Integer.MAX_VALUE, 0),
+ true
+ );
+ } else {
+ SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
+ {
+ @Override
+ public int getDropSegmentDelayMillis()
+ {
+ return delayInMilli;
+ }
+ };
+ appenderator = Appenderators.createRealtime(
+ segmentLoaderConfig,
+ schema.getDataSource(),
+ schema,
+ tuningConfig,
+ metrics,
+ dataSegmentPusher,
+ objectMapper,
+ indexIO,
+ indexMerger,
+ new DefaultQueryRunnerFactoryConglomerate(
+ ImmutableMap.of(
+ TimeseriesQuery.class, new TimeseriesQueryRunnerFactory(
+ new TimeseriesQueryQueryToolChest(),
+ new TimeseriesQueryEngine(),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ ),
+ ScanQuery.class, new ScanQueryRunnerFactory(
+ new ScanQueryQueryToolChest(
+ new ScanQueryConfig(),
+ new DefaultGenericQueryMetricsFactory()
+ ),
+ new ScanQueryEngine(),
+ new ScanQueryConfig()
+ )
+ )
+ ),
+ new NoopDataSegmentAnnouncer(),
+ emitter,
+ new ForwardingQueryProcessingPool(queryExecutor),
+ NoopJoinableFactory.INSTANCE,
+ MapCache.create(2048),
+ new CacheConfig(),
+ new CachePopulatorStats(),
+ rowIngestionMeters,
+ new ParseExceptionHandler(rowIngestionMeters, false,
Integer.MAX_VALUE, 0),
+ true
+ );
+ }
}
private long getDefaultMaxBytesInMemory()
@@ -305,6 +357,7 @@ public class StreamAppenderatorTester implements
AutoCloseable
private boolean enablePushFailure;
private RowIngestionMeters rowIngestionMeters;
private boolean skipBytesInMemoryOverheadCheck;
+ private int delayInMilli = 0;
public Builder maxRowsInMemory(final int maxRowsInMemory)
{
@@ -342,9 +395,16 @@ public class StreamAppenderatorTester implements
AutoCloseable
return this;
}
+ public Builder withSegmentDropDelayInMilli(int delayInMilli)
+ {
+ this.delayInMilli = delayInMilli;
+ return this;
+ }
+
public StreamAppenderatorTester build()
{
return new StreamAppenderatorTester(
+ delayInMilli,
maxRowsInMemory,
maxSizeInBytes,
Preconditions.checkNotNull(basePersistDirectory,
"basePersistDirectory"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]