Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r550960259
##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -297,69 +340,28 @@ impl RecordBatchStream for HashJoinStream {
/// *
fn build_batch_from_indices(
schema: &Schema,
- left: &Vec<RecordBatch>,
+ left: &RecordBatch,
right: &RecordBatch,
- join_type: &JoinType,
- indices: &[(JoinIndex, RightIndex)],
+ left_indices: UInt64Array,
+ right_indices: UInt32Array,
+ column_indices: &Vec<ColumnIndex>,
) -> ArrowResult<RecordBatch> {
- if left.is_empty() {
- todo!("Create empty record batch");
- }
-
- let (primary_is_left, primary_schema, secondary_schema) = match join_type {
- JoinType::Inner | JoinType::Left => (true, left[0].schema(),
right.schema()),
- JoinType::Right => (false, right.schema(), left[0].schema()),
- };
-
// build the columns of the new [RecordBatch]:
// 1. pick whether the column is from the left or right
- // 2. based on the pick, `take` items from the different recordBatches
+ // 2. based on the pick, `take` items from the different RecordBatches
let mut columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(schema.fields().len());
- let right_indices: UInt32Array =
- indices.iter().map(|(_, join_index)| join_index).collect();
-
- for field in schema.fields() {
- // pick the column (left or right) based on the field name.
- let (is_primary, column_index) = match
primary_schema.index_of(field.name()) {
- Ok(i) => Ok((true, i)),
- Err(_) => {
- match secondary_schema.index_of(field.name()) {
- Ok(i) => Ok((false, i)),
- _ => Err(DataFusionError::Internal(
- format!("During execution, the column {} was not found
in neither the left or right side of the join", field.name()).to_string()
- ))
- }
- }
- }.map_err(DataFusionError::into_arrow_external_error)?;
-
- let is_left =
- (is_primary && primary_is_left) || (!is_primary &&
!primary_is_left);
-
- let array = if is_left {
- // Note that we take `.data_ref()` to gather the [ArrayData] of
each array.
- let arrays = left
- .iter()
- .map(|batch| batch.column(column_index).data_ref().as_ref())
- .collect::<Vec<_>>();
-
- let mut mutable = MutableArrayData::new(arrays, true,
indices.len());
- // use the left indices
- for (join_index, _) in indices {
- match join_index {
- Some((batch, row)) => mutable.extend(*batch, *row, *row +
1),
- None => mutable.extend_nulls(1),
- }
- }
- make_array(Arc::new(mutable.freeze()))
+ for column_index in column_indices {
+ let array = if column_index.is_left {
+ let array = left.column(column_index.index);
+ compute::take(array.as_ref(), &left_indices, None)?
} else {
- // use the right indices
- let array = right.column(column_index);
+ let array = right.column(column_index.index);
compute::take(array.as_ref(), &right_indices, None)?
};
columns.push(array);
}
- Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
+ RecordBatch::try_new(Arc::new(schema.clone()), columns)
}
/// Create a key `Vec<u8>` that is used as key for the hashmap
Review comment:
Currently looks like creating this key / hashing is most expensive part
of the queries.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]