This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 01e9d963bd8 Merge hydrant runners flatly for realtime queries. (#15757)
01e9d963bd8 is described below

commit 01e9d963bd8615ccf035c2f301d87fc193837f7a
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jan 25 03:07:57 2024 -0800

    Merge hydrant runners flatly for realtime queries. (#15757)
    
    * Merge hydrant runners flatly for realtime queries.
    
    Prior to this patch, we have two layers of mergeRunners for realtime
    queries: one for each Sink (a logical segment) and one across all
    Sinks. This is done because to keep metrics and results grouped by Sink,
    given that each FireHydrant within a Sink has its own separate storage
    adapter.
    
    However, it costs extra memory usage due to the extra layer of
    materialization. This is especially pronounced for groupBy queries,
    which only use their merge buffers at the top layer of merging. The
    lower layer of merging materializes ResultRows directly into the heap,
    which can cause heap exhaustion if there are enough ResultRows.
    
    This patch changes to a single layer of merging when bySegment: false,
    just like Historicals. To accommodate that, segment metrics like
    query/segment/time are now per-FireHydrant instead of per-Sink.
    
    Two layers of merging are retained when bySegment: true. This isn't
    common, because it's typically only used when segment level caching
    is enabled on the Broker, which is off by default.
    
    * Use SinkQueryRunners.
    
    * Remove unused method.
---
 .../common/task/TestAppenderatorsManager.java      |   1 -
 .../realtime/appenderator/Appenderators.java       |   4 -
 .../DefaultRealtimeAppenderatorFactory.java        |   1 -
 .../appenderator/PeonAppenderatorsManager.java     |   1 -
 .../appenderator/SinkQuerySegmentWalker.java       | 303 +++++++++++----------
 .../UnifiedIndexerAppenderatorsManager.java        |   1 -
 .../segment/realtime/plumber/FlushingPlumber.java  |   1 -
 .../segment/realtime/plumber/RealtimePlumber.java  |   4 -
 .../realtime/plumber/RealtimePlumberSchool.java    |   1 -
 .../appenderator/StreamAppenderatorTester.java     |   3 -
 10 files changed, 163 insertions(+), 157 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index be03fafd357..f54b0ecd2af 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -89,7 +89,6 @@ public class TestAppenderatorsManager implements 
AppenderatorsManager
         segmentAnnouncer,
         emitter,
         queryProcessingPool,
-        joinableFactory,
         cache,
         cacheConfig,
         cachePopulatorStats,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index 47cd058b04d..51115c48bae 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -32,8 +32,6 @@ import org.apache.druid.segment.IndexMerger;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.join.JoinableFactory;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
@@ -58,7 +56,6 @@ public class Appenderators
       DataSegmentAnnouncer segmentAnnouncer,
       ServiceEmitter emitter,
       QueryProcessingPool queryProcessingPool,
-      JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
       CachePopulatorStats cachePopulatorStats,
@@ -86,7 +83,6 @@ public class Appenderators
             emitter,
             conglomerate,
             queryProcessingPool,
-            new JoinableFactoryWrapper(joinableFactory),
             Preconditions.checkNotNull(cache, "cache"),
             cacheConfig,
             cachePopulatorStats
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
index 960779fbf16..e64c315484d 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
@@ -115,7 +115,6 @@ public class DefaultRealtimeAppenderatorFactory implements 
AppenderatorFactory
         segmentAnnouncer,
         emitter,
         queryProcessingPool,
-        joinableFactory,
         cache,
         cacheConfig,
         cachePopulatorStats,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 070ac62568a..dba96acc66a 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -105,7 +105,6 @@ public class PeonAppenderatorsManager implements 
AppenderatorsManager
           segmentAnnouncer,
           emitter,
           queryProcessingPool,
-          joinableFactory,
           cache,
           cacheConfig,
           cachePopulatorStats,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 7c81b60feab..1bf07fa4146 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
 import org.apache.druid.client.CachingQueryRunner;
 import org.apache.druid.client.cache.Cache;
 import org.apache.druid.client.cache.CacheConfig;
@@ -59,7 +58,6 @@ import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.apache.druid.segment.SegmentReference;
 import org.apache.druid.segment.StorageAdapter;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.segment.realtime.plumber.Sink;
 import org.apache.druid.segment.realtime.plumber.SinkSegmentReference;
@@ -69,13 +67,18 @@ import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.utils.CloseableUtils;
 import org.joda.time.Interval;
 
-import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Query handler for indexing tasks.
@@ -92,7 +95,6 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
   private final ServiceEmitter emitter;
   private final QueryRunnerFactoryConglomerate conglomerate;
   private final QueryProcessingPool queryProcessingPool;
-  private final JoinableFactoryWrapper joinableFactoryWrapper;
   private final Cache cache;
   private final CacheConfig cacheConfig;
   private final CachePopulatorStats cachePopulatorStats;
@@ -106,7 +108,6 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
       ServiceEmitter emitter,
       QueryRunnerFactoryConglomerate conglomerate,
       QueryProcessingPool queryProcessingPool,
-      JoinableFactoryWrapper joinableFactoryWrapper,
       Cache cache,
       CacheConfig cacheConfig,
       CachePopulatorStats cachePopulatorStats
@@ -118,7 +119,6 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
     this.conglomerate = Preconditions.checkNotNull(conglomerate, 
"conglomerate");
     this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, 
"queryProcessingPool");
-    this.joinableFactoryWrapper = joinableFactoryWrapper;
     this.cache = Preconditions.checkNotNull(cache, "cache");
     this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
     this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, 
"cachePopulatorStats");
@@ -186,110 +186,170 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     // We compute the join cache key here itself so it doesn't need to be 
re-computed for every segment
     final Optional<byte[]> cacheKeyPrefix = 
Optional.ofNullable(query.getDataSource().getCacheKey());
 
-    Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
-        specs,
-        newDescriptor -> {
-          final SegmentDescriptor descriptor = 
newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
-          final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
-              descriptor.getInterval(),
-              descriptor.getVersion(),
-              descriptor.getPartitionNumber()
+    // We need to report data for each Sink all-or-nothing, which means we 
need to acquire references for all
+    // subsegments (FireHydrants) of a segment (Sink) at once. To ensure they 
are properly released even when a
+    // query fails or is canceled, we acquire *all* sink reference upfront, 
and release them all when the main
+    // QueryRunner returned by this method is closed. (We can't do the 
acquisition and releasing at the level of
+    // each FireHydrant's runner, since then it wouldn't be properly 
all-or-nothing on a per-Sink basis.)
+    final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
+    final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
+    final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = 
new LinkedHashMap<>();
+
+    try {
+      for (final SegmentDescriptor newDescriptor : specs) {
+        final SegmentDescriptor descriptor = 
newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
+        final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
+            descriptor.getInterval(),
+            descriptor.getVersion(),
+            descriptor.getPartitionNumber()
+        );
+
+        if (chunk == null) {
+          allRunners.put(
+              descriptor,
+              Collections.singletonList(new 
ReportTimelineMissingSegmentQueryRunner<>(descriptor))
+          );
+          continue;
+        }
+
+        final Sink theSink = chunk.getObject();
+        final SegmentId sinkSegmentId = theSink.getSegment().getId();
+        segmentIdMap.put(descriptor, sinkSegmentId);
+        final List<SinkSegmentReference> sinkSegmentReferences =
+            theSink.acquireSegmentReferences(segmentMapFn, 
skipIncrementalSegment);
+
+        if (sinkSegmentReferences == null) {
+          // We failed to acquire references for all subsegments. Bail and 
report the entire sink missing.
+          allRunners.put(
+              descriptor,
+              Collections.singletonList(new 
ReportTimelineMissingSegmentQueryRunner<>(descriptor))
           );
+        } else if (sinkSegmentReferences.isEmpty()) {
+          allRunners.put(descriptor, Collections.singletonList(new 
NoopQueryRunner<>()));
+        } else {
+          allSegmentReferences.addAll(sinkSegmentReferences);
 
-          if (chunk == null) {
-            return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
-          }
-
-          final Sink theSink = chunk.getObject();
-          final SegmentId sinkSegmentId = theSink.getSegment().getId();
-          final List<SinkSegmentReference> segmentReferences =
-              theSink.acquireSegmentReferences(segmentMapFn, 
skipIncrementalSegment);
-
-          if (segmentReferences == null) {
-            // We failed to acquire references for all subsegments. Bail and 
report the entire sink missing.
-            return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
-          } else if (segmentReferences.isEmpty()) {
-            return new NoopQueryRunner<>();
-          }
-
-          final Closeable releaser = () -> 
CloseableUtils.closeAll(segmentReferences);
-
-          try {
-            Iterable<QueryRunner<T>> perHydrantRunners = new 
SinkQueryRunners<>(
-                Iterables.transform(
-                    segmentReferences,
-                    segmentReference -> {
-                      QueryRunner<T> runner = 
factory.createRunner(segmentReference.getSegment());
-
-                      // 1) Only use caching if data is immutable
-                      // 2) Hydrants are not the same between replicas, make 
sure cache is local
-                      if (segmentReference.isImmutable() && cache.isLocal()) {
-                        StorageAdapter storageAdapter = 
segmentReference.getSegment().asStorageAdapter();
-                        long segmentMinTime = 
storageAdapter.getMinTime().getMillis();
-                        long segmentMaxTime = 
storageAdapter.getMaxTime().getMillis();
-                        Interval actualDataInterval = 
Intervals.utc(segmentMinTime, segmentMaxTime + 1);
-                        runner = new CachingQueryRunner<>(
-                            makeHydrantCacheIdentifier(sinkSegmentId, 
segmentReference.getHydrantNumber()),
-                            cacheKeyPrefix,
-                            descriptor,
-                            actualDataInterval,
-                            objectMapper,
-                            cache,
-                            toolChest,
-                            runner,
-                            // Always populate in foreground regardless of 
config
-                            new ForegroundCachePopulator(
-                                objectMapper,
-                                cachePopulatorStats,
-                                cacheConfig.getMaxEntrySize()
-                            ),
-                            cacheConfig
-                        );
-                      }
-                      return new 
Pair<>(segmentReference.getSegment().getDataInterval(), runner);
+          allRunners.put(
+              descriptor,
+              sinkSegmentReferences.stream().map(
+                  segmentReference -> {
+                    QueryRunner<T> runner = new MetricsEmittingQueryRunner<>(
+                        emitter,
+                        factory.getToolchest(),
+                        factory.createRunner(segmentReference.getSegment()),
+                        QueryMetrics::reportSegmentTime,
+                        queryMetrics -> 
queryMetrics.segment(sinkSegmentId.toString())
+                    );
+
+                    // 1) Only use caching if data is immutable
+                    // 2) Hydrants are not the same between replicas, make 
sure cache is local
+                    if (segmentReference.isImmutable() && cache.isLocal()) {
+                      StorageAdapter storageAdapter = 
segmentReference.getSegment().asStorageAdapter();
+                      long segmentMinTime = 
storageAdapter.getMinTime().getMillis();
+                      long segmentMaxTime = 
storageAdapter.getMaxTime().getMillis();
+                      Interval actualDataInterval = 
Intervals.utc(segmentMinTime, segmentMaxTime + 1);
+                      runner = new CachingQueryRunner<>(
+                          makeHydrantCacheIdentifier(sinkSegmentId, 
segmentReference.getHydrantNumber()),
+                          cacheKeyPrefix,
+                          descriptor,
+                          actualDataInterval,
+                          objectMapper,
+                          cache,
+                          toolChest,
+                          runner,
+                          // Always populate in foreground regardless of config
+                          new ForegroundCachePopulator(
+                              objectMapper,
+                              cachePopulatorStats,
+                              cacheConfig.getMaxEntrySize()
+                          ),
+                          cacheConfig
+                      );
                     }
-                )
-            );
-            return QueryRunnerHelper.makeClosingQueryRunner(
-                new SpecificSegmentQueryRunner<>(
-                    withPerSinkMetrics(
-                        new BySegmentQueryRunner<>(
-                            sinkSegmentId,
-                            descriptor.getInterval().getStart(),
-                            factory.mergeRunners(
-                                DirectQueryProcessingPool.INSTANCE,
-                                perHydrantRunners
-                            )
-                        ),
+
+                    // Regardless of whether caching is enabled, do 
reportSegmentAndCacheTime outside the
+                    // *possible* caching.
+                    runner = new MetricsEmittingQueryRunner<>(
+                        emitter,
+                        factory.getToolchest(),
+                        runner,
+                        QueryMetrics::reportSegmentAndCacheTime,
+                        queryMetrics -> 
queryMetrics.segment(sinkSegmentId.toString())
+                    ).withWaitMeasuredFromNow();
+
+                    // Emit CPU time metrics.
+                    runner = CPUTimeMetricQueryRunner.safeBuild(
+                        runner,
                         toolChest,
-                        sinkSegmentId,
-                        cpuTimeAccumulator
-                    ),
-                    new SpecificSegmentSpec(descriptor)
-                ),
-                releaser
-            );
-          }
-          catch (Throwable e) {
-            throw CloseableUtils.closeAndWrapInCatch(e, releaser);
-          }
+                        emitter,
+                        cpuTimeAccumulator,
+                        false
+                    );
+
+                    // Run with specific segment descriptor.
+                    runner = new SpecificSegmentQueryRunner<>(
+                        runner,
+                        new SpecificSegmentSpec(descriptor)
+                    );
+
+                    return runner;
+                  }
+              ).collect(Collectors.toList())
+          );
         }
-    );
-    final QueryRunner<T> mergedRunner =
-        toolChest.mergeResults(
-            factory.mergeRunners(
-                queryProcessingPool,
-                perSegmentRunners
-            )
+      }
+
+      final QueryRunner<T> mergedRunner;
+
+      if (query.context().isBySegment()) {
+        // bySegment: merge all hydrants for a Sink first, then merge Sinks. 
Necessary to keep results for the
+        // same segment together, but causes additional memory usage due to 
the extra layer of materialization,
+        // so we only do this if we need to.
+        mergedRunner = factory.mergeRunners(
+            queryProcessingPool,
+            allRunners.entrySet().stream().map(
+                entry -> new BySegmentQueryRunner<>(
+                    segmentIdMap.get(entry.getKey()),
+                    entry.getKey().getInterval().getStart(),
+                    factory.mergeRunners(
+                        DirectQueryProcessingPool.INSTANCE,
+                        entry.getValue()
+                    )
+                )
+            ).collect(Collectors.toList())
+        );
+      } else {
+        // Not bySegment: merge all hydrants at the same level, rather than 
grouped by Sink (segment).
+        mergedRunner = factory.mergeRunners(
+            queryProcessingPool,
+            new SinkQueryRunners<>(
+                allRunners.entrySet().stream().flatMap(
+                    entry ->
+                        entry.getValue().stream().map(
+                            runner ->
+                                Pair.of(entry.getKey().getInterval(), runner)
+                        )
+                ).collect(Collectors.toList()))
         );
+      }
 
-    return CPUTimeMetricQueryRunner.safeBuild(
-        new FinalizeResultsQueryRunner<>(mergedRunner, toolChest),
-        toolChest,
-        emitter,
-        cpuTimeAccumulator,
-        true
-    );
+      // 1) Merge results using the toolChest, finalize if necessary.
+      // 2) Measure CPU time of that operation.
+      // 3) Release all sink segment references.
+      return QueryRunnerHelper.makeClosingQueryRunner(
+          CPUTimeMetricQueryRunner.safeBuild(
+              new 
FinalizeResultsQueryRunner<>(toolChest.mergeResults(mergedRunner), toolChest),
+              toolChest,
+              emitter,
+              cpuTimeAccumulator,
+              true
+          ),
+          () -> CloseableUtils.closeAll(allSegmentReferences)
+      );
+    }
+    catch (Throwable e) {
+      throw CloseableUtils.closeAndWrapInCatch(e, () -> 
CloseableUtils.closeAll(allSegmentReferences));
+    }
   }
 
   public void registerNewVersionOfPendingSegment(
@@ -309,43 +369,6 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     return dataSource;
   }
 
-  /**
-   * Decorates a Sink's query runner to emit query/segmentAndCache/time, 
query/segment/time, query/wait/time once
-   * each for the whole Sink. Also adds CPU time to cpuTimeAccumulator.
-   */
-  private <T> QueryRunner<T> withPerSinkMetrics(
-      final QueryRunner<T> sinkRunner,
-      final QueryToolChest<T, ? extends Query<T>> queryToolChest,
-      final SegmentId sinkSegmentId,
-      final AtomicLong cpuTimeAccumulator
-  )
-  {
-    // Note: reportSegmentAndCacheTime and reportSegmentTime are effectively 
the same here. They don't split apart
-    // cache vs. non-cache due to the fact that Sinks may be partially cached 
and partially uncached. Making this
-    // better would need to involve another accumulator like the 
cpuTimeAccumulator that we could share with the
-    // sinkRunner.
-    String sinkSegmentIdString = sinkSegmentId.toString();
-    return CPUTimeMetricQueryRunner.safeBuild(
-        new MetricsEmittingQueryRunner<>(
-            emitter,
-            queryToolChest,
-            new MetricsEmittingQueryRunner<>(
-                emitter,
-                queryToolChest,
-                sinkRunner,
-                QueryMetrics::reportSegmentTime,
-                queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
-            ),
-            QueryMetrics::reportSegmentAndCacheTime,
-            queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
-        ).withWaitMeasuredFromNow(),
-        queryToolChest,
-        emitter,
-        cpuTimeAccumulator,
-        false
-    );
-  }
-
   public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
   {
     return sinkTimeline;
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index bd829ccfa15..8b5b1482357 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -439,7 +439,6 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
           serviceEmitter,
           queryRunnerFactoryConglomerateProvider.get(),
           queryProcessingPool,
-          joinableFactoryWrapper,
           Preconditions.checkNotNull(cache, "cache"),
           cacheConfig,
           cachePopulatorStats
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
index 2194ff70401..a271c4540c5 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
@@ -91,7 +91,6 @@ public class FlushingPlumber extends RealtimePlumber
         conglomerate,
         segmentAnnouncer,
         queryProcessingPool,
-        joinableFactory,
         null,
         null,
         null,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
index efabdcc1685..0380abf9f22 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -64,8 +64,6 @@ import 
org.apache.druid.segment.incremental.IncrementalIndexAddResult;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.join.JoinableFactory;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.FireHydrant;
@@ -142,7 +140,6 @@ public class RealtimePlumber implements Plumber
       QueryRunnerFactoryConglomerate conglomerate,
       DataSegmentAnnouncer segmentAnnouncer,
       QueryProcessingPool queryProcessingPool,
-      JoinableFactory joinableFactory,
       DataSegmentPusher dataSegmentPusher,
       SegmentPublisher segmentPublisher,
       SegmentHandoffNotifier handoffNotifier,
@@ -172,7 +169,6 @@ public class RealtimePlumber implements Plumber
         emitter,
         conglomerate,
         queryProcessingPool,
-        new JoinableFactoryWrapper(joinableFactory),
         cache,
         cacheConfig,
         cachePopulatorStats
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
index e2ba02cbc0e..8b19153a9de 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
@@ -112,7 +112,6 @@ public class RealtimePlumberSchool implements PlumberSchool
         conglomerate,
         segmentAnnouncer,
         queryProcessingPool,
-        joinableFactory,
         dataSegmentPusher,
         segmentPublisher,
         
handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()),
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index bcf2f8a2216..e81fd9795d8 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -61,7 +61,6 @@ import 
org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
@@ -247,7 +246,6 @@ public class StreamAppenderatorTester implements 
AutoCloseable
           announcer,
           emitter,
           new ForwardingQueryProcessingPool(queryExecutor),
-          NoopJoinableFactory.INSTANCE,
           MapCache.create(2048),
           new CacheConfig(),
           new CachePopulatorStats(),
@@ -295,7 +293,6 @@ public class StreamAppenderatorTester implements 
AutoCloseable
           new NoopDataSegmentAnnouncer(),
           emitter,
           new ForwardingQueryProcessingPool(queryExecutor),
-          NoopJoinableFactory.INSTANCE,
           MapCache.create(2048),
           new CacheConfig(),
           new CachePopulatorStats(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to