imply-cheddar commented on code in PR #13085:
URL: https://github.com/apache/druid/pull/13085#discussion_r997779323


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -146,18 +147,30 @@ protected SegmentReference mapSegment(final Segment 
segment)
 
   private boolean initializeSegmentMapFn(final IntSet readableInputs)
   {
+    final AtomicLong cpuAccumulator = new AtomicLong();
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
       segmentMapFn = Function.identity();
       return true;
     } else {
       final boolean retVal = 
broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
-
+      DataSource inlineChannelDataSource = 
broadcastJoinHelper.inlineChannelData(query.getDataSource());
       if (retVal) {
-        segmentMapFn = broadcastJoinHelper.makeSegmentMapFn(query);
+        if (inlineChannelDataSource instanceof InputNumberDataSource) {
+          InputNumberDataSource inputNumberDataSource = 
(InputNumberDataSource) query.getDataSource();
+          // The InputNumberDataSource requires a BroadcastJoinHelper to be 
able to create its
+          // segment map function.  It would be a lot better if the 
InputNumberDataSource actually
+          // had a way to get that injected into it on its own, but the 
relationship between these objects
+          // was figured out during a refactor and using a setter here seemed 
like the least-bad way to
+          // make progress on the refactor without breaking functionality.  
Hopefully, some future
+          // developer will move this away from a setter.
+          inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
+          segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, 
cpuAccumulator);
+        } else {
+          segmentMapFn = 
inlineChannelDataSource.createSegmentMapFunction(query, cpuAccumulator);

Review Comment:
   Did we find a test that was broken with the previous code?  If not, then 
please revert back to the code that @abhishekagarwal87 initially commented on 
and keep that code active.  That code is not broken until we can actually 
reproduce the breakage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to