tobixdev commented on code in PR #18254:
URL: https://github.com/apache/datafusion/pull/18254#discussion_r2466160581


##########
datafusion/physical-plan/src/recursive_query.rs:
##########
@@ -434,5 +452,55 @@ impl RecordBatchStream for RecursiveQueryStream {
     }
 }
 
+/// Deduplicator based on a hash table.
+struct DistinctDeduplicator {
+    /// Grouped rows used for distinct
+    group_values: Box<dyn GroupValues>,
+    reservation: MemoryReservation,
+    intern_output_buffer: Vec<usize>,
+}
+
+impl DistinctDeduplicator {
+    fn new(schema: SchemaRef, task_context: &TaskContext) -> Result<Self> {
+        let group_values = new_group_values(schema, &GroupOrdering::None)?;
+        let reservation = MemoryConsumer::new("RecursiveQueryHashTable")
+            .register(task_context.memory_pool());
+        Ok(Self {
+            group_values,
+            reservation,
+            intern_output_buffer: Vec::new(),
+        })
+    }
+
+    fn deduplicate(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
+        // We use the hash table to allocate new group ids.
+        // If they are new, i.e., if they have ids >= length before interning, 
we keep them.
+        // We also detect duplicates by enforcing that group ids are 
increasing.
+        let size_before = self.group_values.len();
+        self.intern_output_buffer.reserve(batch.num_rows());
+        self.group_values
+            .intern(batch.columns(), &mut self.intern_output_buffer)?;
+        let mask = are_increasing_mask(&self.intern_output_buffer, 
size_before);
+        self.intern_output_buffer.clear();
+        // We update the reservation to reflect the new size of the hash table.
+        self.reservation.try_resize(self.group_values.size())?;
+        Ok(filter_record_batch(batch, &mask)?)
+    }
+}
+
+/// Return a mask, each element true if the value is greater than all previous 
ones and greater or equal than the min_value
+fn are_increasing_mask(values: &[usize], mut min_value: usize) -> BooleanArray 
{

Review Comment:
   That's perfectly fine. Just a suggestion :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to