This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 01e9d963bd8 Merge hydrant runners flatly for realtime queries. (#15757)
01e9d963bd8 is described below
commit 01e9d963bd8615ccf035c2f301d87fc193837f7a
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jan 25 03:07:57 2024 -0800
Merge hydrant runners flatly for realtime queries. (#15757)
* Merge hydrant runners flatly for realtime queries.
Prior to this patch, we have two layers of mergeRunners for realtime
queries: one for each Sink (a logical segment) and one across all
Sinks. This is done because to keep metrics and results grouped by Sink,
given that each FireHydrant within a Sink has its own separate storage
adapter.
However, it costs extra memory usage due to the extra layer of
materialization. This is especially pronounced for groupBy queries,
which only use their merge buffers at the top layer of merging. The
lower layer of merging materializes ResultRows directly into the heap,
which can cause heap exhaustion if there are enough ResultRows.
This patch changes to a single layer of merging when bySegment: false,
just like Historicals. To accommodate that, segment metrics like
query/segment/time are now per-FireHydrant instead of per-Sink.
Two layers of merging are retained when bySegment: true. This isn't
common, because it's typically only used when segment level caching
is enabled on the Broker, which is off by default.
* Use SinkQueryRunners.
* Remove unused method.
---
.../common/task/TestAppenderatorsManager.java | 1 -
.../realtime/appenderator/Appenderators.java | 4 -
.../DefaultRealtimeAppenderatorFactory.java | 1 -
.../appenderator/PeonAppenderatorsManager.java | 1 -
.../appenderator/SinkQuerySegmentWalker.java | 303 +++++++++++----------
.../UnifiedIndexerAppenderatorsManager.java | 1 -
.../segment/realtime/plumber/FlushingPlumber.java | 1 -
.../segment/realtime/plumber/RealtimePlumber.java | 4 -
.../realtime/plumber/RealtimePlumberSchool.java | 1 -
.../appenderator/StreamAppenderatorTester.java | 3 -
10 files changed, 163 insertions(+), 157 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index be03fafd357..f54b0ecd2af 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -89,7 +89,6 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
segmentAnnouncer,
emitter,
queryProcessingPool,
- joinableFactory,
cache,
cacheConfig,
cachePopulatorStats,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index 47cd058b04d..51115c48bae 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -32,8 +32,6 @@ import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.join.JoinableFactory;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
@@ -58,7 +56,6 @@ public class Appenderators
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
QueryProcessingPool queryProcessingPool,
- JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats,
@@ -86,7 +83,6 @@ public class Appenderators
emitter,
conglomerate,
queryProcessingPool,
- new JoinableFactoryWrapper(joinableFactory),
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,
cachePopulatorStats
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
index 960779fbf16..e64c315484d 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
@@ -115,7 +115,6 @@ public class DefaultRealtimeAppenderatorFactory implements
AppenderatorFactory
segmentAnnouncer,
emitter,
queryProcessingPool,
- joinableFactory,
cache,
cacheConfig,
cachePopulatorStats,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 070ac62568a..dba96acc66a 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -105,7 +105,6 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
segmentAnnouncer,
emitter,
queryProcessingPool,
- joinableFactory,
cache,
cacheConfig,
cachePopulatorStats,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 7c81b60feab..1bf07fa4146 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
@@ -59,7 +58,6 @@ import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.segment.realtime.plumber.SinkSegmentReference;
@@ -69,13 +67,18 @@ import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;
-import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* Query handler for indexing tasks.
@@ -92,7 +95,6 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final QueryProcessingPool queryProcessingPool;
- private final JoinableFactoryWrapper joinableFactoryWrapper;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
@@ -106,7 +108,6 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
QueryProcessingPool queryProcessingPool,
- JoinableFactoryWrapper joinableFactoryWrapper,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
@@ -118,7 +119,6 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.conglomerate = Preconditions.checkNotNull(conglomerate,
"conglomerate");
this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool,
"queryProcessingPool");
- this.joinableFactoryWrapper = joinableFactoryWrapper;
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats,
"cachePopulatorStats");
@@ -186,110 +186,170 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
// We compute the join cache key here itself so it doesn't need to be
re-computed for every segment
final Optional<byte[]> cacheKeyPrefix =
Optional.ofNullable(query.getDataSource().getCacheKey());
- Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
- specs,
- newDescriptor -> {
- final SegmentDescriptor descriptor =
newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
- final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
- descriptor.getInterval(),
- descriptor.getVersion(),
- descriptor.getPartitionNumber()
+ // We need to report data for each Sink all-or-nothing, which means we
need to acquire references for all
+ // subsegments (FireHydrants) of a segment (Sink) at once. To ensure they
are properly released even when a
+ // query fails or is canceled, we acquire *all* sink reference upfront,
and release them all when the main
+ // QueryRunner returned by this method is closed. (We can't do the
acquisition and releasing at the level of
+ // each FireHydrant's runner, since then it wouldn't be properly
all-or-nothing on a per-Sink basis.)
+ final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
+ final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
+ final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners =
new LinkedHashMap<>();
+
+ try {
+ for (final SegmentDescriptor newDescriptor : specs) {
+ final SegmentDescriptor descriptor =
newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
+ final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
+ descriptor.getInterval(),
+ descriptor.getVersion(),
+ descriptor.getPartitionNumber()
+ );
+
+ if (chunk == null) {
+ allRunners.put(
+ descriptor,
+ Collections.singletonList(new
ReportTimelineMissingSegmentQueryRunner<>(descriptor))
+ );
+ continue;
+ }
+
+ final Sink theSink = chunk.getObject();
+ final SegmentId sinkSegmentId = theSink.getSegment().getId();
+ segmentIdMap.put(descriptor, sinkSegmentId);
+ final List<SinkSegmentReference> sinkSegmentReferences =
+ theSink.acquireSegmentReferences(segmentMapFn,
skipIncrementalSegment);
+
+ if (sinkSegmentReferences == null) {
+ // We failed to acquire references for all subsegments. Bail and
report the entire sink missing.
+ allRunners.put(
+ descriptor,
+ Collections.singletonList(new
ReportTimelineMissingSegmentQueryRunner<>(descriptor))
);
+ } else if (sinkSegmentReferences.isEmpty()) {
+ allRunners.put(descriptor, Collections.singletonList(new
NoopQueryRunner<>()));
+ } else {
+ allSegmentReferences.addAll(sinkSegmentReferences);
- if (chunk == null) {
- return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
- }
-
- final Sink theSink = chunk.getObject();
- final SegmentId sinkSegmentId = theSink.getSegment().getId();
- final List<SinkSegmentReference> segmentReferences =
- theSink.acquireSegmentReferences(segmentMapFn,
skipIncrementalSegment);
-
- if (segmentReferences == null) {
- // We failed to acquire references for all subsegments. Bail and
report the entire sink missing.
- return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
- } else if (segmentReferences.isEmpty()) {
- return new NoopQueryRunner<>();
- }
-
- final Closeable releaser = () ->
CloseableUtils.closeAll(segmentReferences);
-
- try {
- Iterable<QueryRunner<T>> perHydrantRunners = new
SinkQueryRunners<>(
- Iterables.transform(
- segmentReferences,
- segmentReference -> {
- QueryRunner<T> runner =
factory.createRunner(segmentReference.getSegment());
-
- // 1) Only use caching if data is immutable
- // 2) Hydrants are not the same between replicas, make
sure cache is local
- if (segmentReference.isImmutable() && cache.isLocal()) {
- StorageAdapter storageAdapter =
segmentReference.getSegment().asStorageAdapter();
- long segmentMinTime =
storageAdapter.getMinTime().getMillis();
- long segmentMaxTime =
storageAdapter.getMaxTime().getMillis();
- Interval actualDataInterval =
Intervals.utc(segmentMinTime, segmentMaxTime + 1);
- runner = new CachingQueryRunner<>(
- makeHydrantCacheIdentifier(sinkSegmentId,
segmentReference.getHydrantNumber()),
- cacheKeyPrefix,
- descriptor,
- actualDataInterval,
- objectMapper,
- cache,
- toolChest,
- runner,
- // Always populate in foreground regardless of
config
- new ForegroundCachePopulator(
- objectMapper,
- cachePopulatorStats,
- cacheConfig.getMaxEntrySize()
- ),
- cacheConfig
- );
- }
- return new
Pair<>(segmentReference.getSegment().getDataInterval(), runner);
+ allRunners.put(
+ descriptor,
+ sinkSegmentReferences.stream().map(
+ segmentReference -> {
+ QueryRunner<T> runner = new MetricsEmittingQueryRunner<>(
+ emitter,
+ factory.getToolchest(),
+ factory.createRunner(segmentReference.getSegment()),
+ QueryMetrics::reportSegmentTime,
+ queryMetrics ->
queryMetrics.segment(sinkSegmentId.toString())
+ );
+
+ // 1) Only use caching if data is immutable
+ // 2) Hydrants are not the same between replicas, make
sure cache is local
+ if (segmentReference.isImmutable() && cache.isLocal()) {
+ StorageAdapter storageAdapter =
segmentReference.getSegment().asStorageAdapter();
+ long segmentMinTime =
storageAdapter.getMinTime().getMillis();
+ long segmentMaxTime =
storageAdapter.getMaxTime().getMillis();
+ Interval actualDataInterval =
Intervals.utc(segmentMinTime, segmentMaxTime + 1);
+ runner = new CachingQueryRunner<>(
+ makeHydrantCacheIdentifier(sinkSegmentId,
segmentReference.getHydrantNumber()),
+ cacheKeyPrefix,
+ descriptor,
+ actualDataInterval,
+ objectMapper,
+ cache,
+ toolChest,
+ runner,
+ // Always populate in foreground regardless of config
+ new ForegroundCachePopulator(
+ objectMapper,
+ cachePopulatorStats,
+ cacheConfig.getMaxEntrySize()
+ ),
+ cacheConfig
+ );
}
- )
- );
- return QueryRunnerHelper.makeClosingQueryRunner(
- new SpecificSegmentQueryRunner<>(
- withPerSinkMetrics(
- new BySegmentQueryRunner<>(
- sinkSegmentId,
- descriptor.getInterval().getStart(),
- factory.mergeRunners(
- DirectQueryProcessingPool.INSTANCE,
- perHydrantRunners
- )
- ),
+
+ // Regardless of whether caching is enabled, do
reportSegmentAndCacheTime outside the
+ // *possible* caching.
+ runner = new MetricsEmittingQueryRunner<>(
+ emitter,
+ factory.getToolchest(),
+ runner,
+ QueryMetrics::reportSegmentAndCacheTime,
+ queryMetrics ->
queryMetrics.segment(sinkSegmentId.toString())
+ ).withWaitMeasuredFromNow();
+
+ // Emit CPU time metrics.
+ runner = CPUTimeMetricQueryRunner.safeBuild(
+ runner,
toolChest,
- sinkSegmentId,
- cpuTimeAccumulator
- ),
- new SpecificSegmentSpec(descriptor)
- ),
- releaser
- );
- }
- catch (Throwable e) {
- throw CloseableUtils.closeAndWrapInCatch(e, releaser);
- }
+ emitter,
+ cpuTimeAccumulator,
+ false
+ );
+
+ // Run with specific segment descriptor.
+ runner = new SpecificSegmentQueryRunner<>(
+ runner,
+ new SpecificSegmentSpec(descriptor)
+ );
+
+ return runner;
+ }
+ ).collect(Collectors.toList())
+ );
}
- );
- final QueryRunner<T> mergedRunner =
- toolChest.mergeResults(
- factory.mergeRunners(
- queryProcessingPool,
- perSegmentRunners
- )
+ }
+
+ final QueryRunner<T> mergedRunner;
+
+ if (query.context().isBySegment()) {
+ // bySegment: merge all hydrants for a Sink first, then merge Sinks.
Necessary to keep results for the
+ // same segment together, but causes additional memory usage due to
the extra layer of materialization,
+ // so we only do this if we need to.
+ mergedRunner = factory.mergeRunners(
+ queryProcessingPool,
+ allRunners.entrySet().stream().map(
+ entry -> new BySegmentQueryRunner<>(
+ segmentIdMap.get(entry.getKey()),
+ entry.getKey().getInterval().getStart(),
+ factory.mergeRunners(
+ DirectQueryProcessingPool.INSTANCE,
+ entry.getValue()
+ )
+ )
+ ).collect(Collectors.toList())
+ );
+ } else {
+ // Not bySegment: merge all hydrants at the same level, rather than
grouped by Sink (segment).
+ mergedRunner = factory.mergeRunners(
+ queryProcessingPool,
+ new SinkQueryRunners<>(
+ allRunners.entrySet().stream().flatMap(
+ entry ->
+ entry.getValue().stream().map(
+ runner ->
+ Pair.of(entry.getKey().getInterval(), runner)
+ )
+ ).collect(Collectors.toList()))
);
+ }
- return CPUTimeMetricQueryRunner.safeBuild(
- new FinalizeResultsQueryRunner<>(mergedRunner, toolChest),
- toolChest,
- emitter,
- cpuTimeAccumulator,
- true
- );
+ // 1) Merge results using the toolChest, finalize if necessary.
+ // 2) Measure CPU time of that operation.
+ // 3) Release all sink segment references.
+ return QueryRunnerHelper.makeClosingQueryRunner(
+ CPUTimeMetricQueryRunner.safeBuild(
+ new
FinalizeResultsQueryRunner<>(toolChest.mergeResults(mergedRunner), toolChest),
+ toolChest,
+ emitter,
+ cpuTimeAccumulator,
+ true
+ ),
+ () -> CloseableUtils.closeAll(allSegmentReferences)
+ );
+ }
+ catch (Throwable e) {
+ throw CloseableUtils.closeAndWrapInCatch(e, () ->
CloseableUtils.closeAll(allSegmentReferences));
+ }
}
public void registerNewVersionOfPendingSegment(
@@ -309,43 +369,6 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
return dataSource;
}
- /**
- * Decorates a Sink's query runner to emit query/segmentAndCache/time,
query/segment/time, query/wait/time once
- * each for the whole Sink. Also adds CPU time to cpuTimeAccumulator.
- */
- private <T> QueryRunner<T> withPerSinkMetrics(
- final QueryRunner<T> sinkRunner,
- final QueryToolChest<T, ? extends Query<T>> queryToolChest,
- final SegmentId sinkSegmentId,
- final AtomicLong cpuTimeAccumulator
- )
- {
- // Note: reportSegmentAndCacheTime and reportSegmentTime are effectively
the same here. They don't split apart
- // cache vs. non-cache due to the fact that Sinks may be partially cached
and partially uncached. Making this
- // better would need to involve another accumulator like the
cpuTimeAccumulator that we could share with the
- // sinkRunner.
- String sinkSegmentIdString = sinkSegmentId.toString();
- return CPUTimeMetricQueryRunner.safeBuild(
- new MetricsEmittingQueryRunner<>(
- emitter,
- queryToolChest,
- new MetricsEmittingQueryRunner<>(
- emitter,
- queryToolChest,
- sinkRunner,
- QueryMetrics::reportSegmentTime,
- queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
- ),
- QueryMetrics::reportSegmentAndCacheTime,
- queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
- ).withWaitMeasuredFromNow(),
- queryToolChest,
- emitter,
- cpuTimeAccumulator,
- false
- );
- }
-
public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
{
return sinkTimeline;
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index bd829ccfa15..8b5b1482357 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -439,7 +439,6 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
serviceEmitter,
queryRunnerFactoryConglomerateProvider.get(),
queryProcessingPool,
- joinableFactoryWrapper,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,
cachePopulatorStats
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
index 2194ff70401..a271c4540c5 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
@@ -91,7 +91,6 @@ public class FlushingPlumber extends RealtimePlumber
conglomerate,
segmentAnnouncer,
queryProcessingPool,
- joinableFactory,
null,
null,
null,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
index efabdcc1685..0380abf9f22 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -64,8 +64,6 @@ import
org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.join.JoinableFactory;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
@@ -142,7 +140,6 @@ public class RealtimePlumber implements Plumber
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
QueryProcessingPool queryProcessingPool,
- JoinableFactory joinableFactory,
DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher,
SegmentHandoffNotifier handoffNotifier,
@@ -172,7 +169,6 @@ public class RealtimePlumber implements Plumber
emitter,
conglomerate,
queryProcessingPool,
- new JoinableFactoryWrapper(joinableFactory),
cache,
cacheConfig,
cachePopulatorStats
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
index e2ba02cbc0e..8b19153a9de 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
@@ -112,7 +112,6 @@ public class RealtimePlumberSchool implements PlumberSchool
conglomerate,
segmentAnnouncer,
queryProcessingPool,
- joinableFactory,
dataSegmentPusher,
segmentPublisher,
handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()),
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index bcf2f8a2216..e81fd9795d8 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -61,7 +61,6 @@ import
org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
@@ -247,7 +246,6 @@ public class StreamAppenderatorTester implements
AutoCloseable
announcer,
emitter,
new ForwardingQueryProcessingPool(queryExecutor),
- NoopJoinableFactory.INSTANCE,
MapCache.create(2048),
new CacheConfig(),
new CachePopulatorStats(),
@@ -295,7 +293,6 @@ public class StreamAppenderatorTester implements
AutoCloseable
new NoopDataSegmentAnnouncer(),
emitter,
new ForwardingQueryProcessingPool(queryExecutor),
- NoopJoinableFactory.INSTANCE,
MapCache.create(2048),
new CacheConfig(),
new CachePopulatorStats(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]