github-code-scanning[bot] commented on code in PR #13694: URL: https://github.com/apache/druid/pull/13694#discussion_r1080744790
########## processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java: ########## @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator.join; + +import org.apache.druid.collections.fastutil.DruidIntList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.operator.Operator; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RearrangedRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.semantic.SortedMatrixMaker; +import org.apache.druid.query.rowsandcols.util.FindResult; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * An operator that can join the data streams from other operators. Data must be provided in the same sorted + * order between the different operators. + * <p> + * Performs an InnerJoin based on the equality of two sets of fields. Null is considered a meaningful value + * when comparing the two streams. If null is intended to be excluded, it should be removed through a filter. + * <p> + * This class was created more as an exercise in ensuring that something meaningful can be made to do combinations + * of Operators (and ensure the interface is correct). There are tests that show that this class works, but those + * tests are not (yet) considered exhaustive. This paragraph in the comments should exist as a cautionary indication + * that if/when this class is dusted off for use again, there might be bugs yet lurking and it should likely start + * with fleshing out the tests. + */ +public class SortedInnerJoinOperator implements Operator +{ + private static final Logger log = new Logger(SortedInnerJoinOperator.class); + + private final ArrayList<JoinPart> parts; + private final JoinConfig config; + + public SortedInnerJoinOperator( + List<JoinPartDefn> partDefns, + JoinConfig config + ) + { + this.parts = new ArrayList<>(partDefns.size()); + for (JoinPartDefn partDefn : partDefns) { + parts.add(new JoinPart(partDefn.getOp(), partDefn.getJoinFields(), partDefn.getProjectFields())); + } + + this.config = config; + } + + @Override + public Closeable goOrContinue(Closeable continuation, Receiver receiver) + { + JoinLogic joinLogic; + if (continuation == null) { + joinLogic = new JoinLogic(config, parts); + } else { + joinLogic = (JoinLogic) continuation; + } + + try { + joinLogic.go(receiver); + switch (joinLogic.state) { + case NEEDS_DATA: + case READY: + throw new ISE("joinLogic.go() exited with state[%s], should never happen.", joinLogic.state); + case COMPLETE: + return null; + case PAUSED: + return joinLogic; + default: + throw new ISE("Unknown state[%s]", joinLogic.state); + } + } + catch (RuntimeException e) { + try { + joinLogic.close(); + } + catch (Exception ex) { + e.addSuppressed(ex); + } + throw e; + } + } + + private static class JoinLogic implements Closeable + { + private final JoinConfig config; + + private final ArrayList<JoinPart> joinParts; + + private State state; + private int nextPositionToLoad; + + private JoinLogic( + JoinConfig config, + ArrayList<JoinPart> joinParts + ) + { + this.config = config; + this.joinParts = joinParts; + + setNextPositionToLoad(joinParts.size() - 1); + } + + public void go(Receiver receiver) + { + while (state != State.COMPLETE) { + final int position = nextPositionToLoad; + final JoinPart joinPart = joinParts.get(position); + //noinspection VariableNotUsedInsideIf + if (joinPart.curr != null) { + throw new ISE("loading data for position[%d], but it already had data!? Probably a bug!", position); + } + + joinPart.goOrContinue(new Receiver() + { + @Override + public Signal push(RowsAndColumns rac) + { + joinPart.setCurr(rac); + process(receiver); + + switch (state) { + case COMPLETE: + return Signal.STOP; + case NEEDS_DATA: + return Signal.GO; + case PAUSED: + return Signal.PAUSE; + case READY: + throw new ISE("Was in state READY after process returned!?"); + default: + throw new ISE("Unknown state[%s]", state); + } + } + + @Override + public void completed() + { + joinPart.complete.set(true); + } + }); + + if (joinPart.continuation == null && joinPart.needsData() && joinPart.isComplete()) { + // In this case, (1) the op returned null, (2) we don't have anything buffered to process and (3) completed + // was called. We are done. + state = State.COMPLETE; + } + + switch (state) { Review Comment: ## Missing enum case in switch Switch statement does not have a case for [NEEDS_DATA](1). [Show more details](https://github.com/apache/druid/security/code-scanning/3698) ########## processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java: ########## @@ -72,13 +85,14 @@ @Override public int numRows() { - return pointers.length; + return end - start; Review Comment: ## User-controlled data in arithmetic expression This arithmetic expression depends on a [user-provided value](1), potentially causing an underflow. This arithmetic expression depends on a [user-provided value](2), potentially causing an underflow. This arithmetic expression depends on a [user-provided value](3), potentially causing an underflow. This arithmetic expression depends on a [user-provided value](4), potentially causing an underflow. This arithmetic expression depends on a [user-provided value](5), potentially causing an underflow. [Show more details](https://github.com/apache/druid/security/code-scanning/3700) ########## processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java: ########## @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator.join; + +import org.apache.druid.collections.fastutil.DruidIntList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.operator.Operator; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RearrangedRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.semantic.SortedMatrixMaker; +import org.apache.druid.query.rowsandcols.util.FindResult; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * An operator that can join the data streams from other operators. Data must be provided in the same sorted + * order between the different operators. + * <p> + * Performs an InnerJoin based on the equality of two sets of fields. Null is considered a meaningful value + * when comparing the two streams. If null is intended to be excluded, it should be removed through a filter. + * <p> + * This class was created more as an exercise in ensuring that something meaningful can be made to do combinations + * of Operators (and ensure the interface is correct). There are tests that show that this class works, but those + * tests are not (yet) considered exhaustive. This paragraph in the comments should exist as a cautionary indication + * that if/when this class is dusted off for use again, there might be bugs yet lurking and it should likely start + * with fleshing out the tests. + */ +public class SortedInnerJoinOperator implements Operator +{ + private static final Logger log = new Logger(SortedInnerJoinOperator.class); + + private final ArrayList<JoinPart> parts; + private final JoinConfig config; + + public SortedInnerJoinOperator( + List<JoinPartDefn> partDefns, + JoinConfig config + ) + { + this.parts = new ArrayList<>(partDefns.size()); + for (JoinPartDefn partDefn : partDefns) { + parts.add(new JoinPart(partDefn.getOp(), partDefn.getJoinFields(), partDefn.getProjectFields())); + } + + this.config = config; + } + + @Override + public Closeable goOrContinue(Closeable continuation, Receiver receiver) + { + JoinLogic joinLogic; + if (continuation == null) { + joinLogic = new JoinLogic(config, parts); + } else { + joinLogic = (JoinLogic) continuation; + } + + try { + joinLogic.go(receiver); + switch (joinLogic.state) { + case NEEDS_DATA: + case READY: + throw new ISE("joinLogic.go() exited with state[%s], should never happen.", joinLogic.state); + case COMPLETE: + return null; + case PAUSED: + return joinLogic; + default: + throw new ISE("Unknown state[%s]", joinLogic.state); + } + } + catch (RuntimeException e) { + try { + joinLogic.close(); + } + catch (Exception ex) { + e.addSuppressed(ex); + } + throw e; + } + } + + private static class JoinLogic implements Closeable + { + private final JoinConfig config; + + private final ArrayList<JoinPart> joinParts; + + private State state; + private int nextPositionToLoad; + + private JoinLogic( + JoinConfig config, + ArrayList<JoinPart> joinParts + ) + { + this.config = config; + this.joinParts = joinParts; + + setNextPositionToLoad(joinParts.size() - 1); + } + + public void go(Receiver receiver) + { + while (state != State.COMPLETE) { + final int position = nextPositionToLoad; + final JoinPart joinPart = joinParts.get(position); + //noinspection VariableNotUsedInsideIf + if (joinPart.curr != null) { + throw new ISE("loading data for position[%d], but it already had data!? Probably a bug!", position); + } + + joinPart.goOrContinue(new Receiver() + { + @Override + public Signal push(RowsAndColumns rac) + { + joinPart.setCurr(rac); + process(receiver); + + switch (state) { + case COMPLETE: + return Signal.STOP; + case NEEDS_DATA: + return Signal.GO; + case PAUSED: + return Signal.PAUSE; + case READY: + throw new ISE("Was in state READY after process returned!?"); + default: + throw new ISE("Unknown state[%s]", state); + } + } + + @Override + public void completed() + { + joinPart.complete.set(true); + } + }); + + if (joinPart.continuation == null && joinPart.needsData() && joinPart.isComplete()) { + // In this case, (1) the op returned null, (2) we don't have anything buffered to process and (3) completed + // was called. We are done. + state = State.COMPLETE; + } + + switch (state) { + case READY: + throw new ISE("Don't expect READY state here, process() should've changed it to something else"); + case PAUSED: + return; + case COMPLETE: + receiver.completed(); + + try { + close(); + } + catch (IOException e) { + log.warn("Problem closing a join part[%d], ignoring because we are done anyway.", position); + } + break; + } + } + } + + /** + * Processes whatever it can from the buffers, pushes the created RowsAndColumns to the receiver and + * returns which side we need more data from. + * <p> + * updates {@link #nextPositionToLoad} to a positive number if there is another position to load. + * sets it to -1 if processing is complete. + */ + private void process(Receiver receiver) + { + // First check that we have something to work with for all parts of the join + for (int i = joinParts.size() - 1; i >= 0; --i) { + final JoinPart joinPart = joinParts.get(i); + if (joinPart.needsData()) { + if (joinPart.isComplete()) { + state = State.COMPLETE; + } else { + setNextPositionToLoad(i); + } + return; + } + } + state = State.READY; + + DruidIntList[] rowsToInclude = new DruidIntList[joinParts.size()]; + for (int i = 0; i < rowsToInclude.length; ++i) { + rowsToInclude[i] = new DruidIntList(config.getBufferSize()); + if (joinParts.get(i).needsData()) { + throw new ISE("doJoin called while joinPart[%d] needed data. This is likely a bug", i); + } + } + + final int finalIndex = joinParts.size() - 1; + final JoinPart finalPart = joinParts.get(finalIndex); + SortedMatrixMaker.SortedMatrix.MatrixRow row = null; + while (state == State.READY) { + if (row == null) { + row = finalPart.currMatrix.getRow(finalPart.currRowIndex); + } + + row = joinRows(receiver, finalIndex, rowsToInclude, row); + } + + pushRows(receiver, rowsToInclude); + } + + private void pushRows(Receiver receiver, DruidIntList[] rowsToInclude) + { + int size = rowsToInclude[0].size(); + if (size == 0) { + return; + } + + LinkedHashMap<String, Column> cols = new LinkedHashMap<>(); + + for (int i = 0; i < joinParts.size(); ++i) { + final JoinPart part = joinParts.get(i); + RowsAndColumns remapped = + new RearrangedRowsAndColumns(rowsToInclude[i].elements(), 0, rowsToInclude[i].size(), part.curr); + + for (String field : part.projectFields) { + cols.put(field, remapped.findColumn(field)); + } + } + + final Signal signal = receiver.push(new MapOfColumnsRowsAndColumns(cols, size)); + switch (signal) { Review Comment: ## Missing enum case in switch Switch statement does not have a case for [GO](1). [Show more details](https://github.com/apache/druid/security/code-scanning/3699) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
