tustvold commented on code in PR #8021:
URL: https://github.com/apache/arrow-datafusion/pull/8021#discussion_r1379310066


##########
datafusion/core/tests/fifo.rs:
##########
@@ -17,32 +17,208 @@
 
 //! This test demonstrates the DataFusion FIFO capabilities.
 //!
-#[cfg(not(target_os = "windows"))]
+#[cfg(target_family = "unix")]
 #[cfg(test)]
 mod unix_test {
     use arrow::array::Array;
-    use arrow::csv::ReaderBuilder;
+    use arrow::csv::{ReaderBuilder, WriterBuilder};
     use arrow::datatypes::{DataType, Field, Schema};
-    use datafusion::test_util::register_unbounded_file_with_ordering;
+    use arrow_array::RecordBatch;
+    use arrow_schema::{SchemaRef, SortOptions};
+    use async_trait::async_trait;
+    use datafusion::datasource::provider::TableProviderFactory;
+    use datafusion::datasource::TableProvider;
+    use datafusion::execution::context::SessionState;
     use datafusion::{
+        physical_plan,
         prelude::{CsvReadOptions, SessionConfig, SessionContext},
         test_util::{aggr_test_schema, arrow_test_data},
     };
     use datafusion_common::{exec_err, DataFusionError, Result};
+    use datafusion_execution::runtime_env::RuntimeEnv;
+    use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+    use datafusion_expr::{CreateExternalTable, Expr, TableType};
+    use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
+    use datafusion_physical_plan::common::AbortOnDropSingle;
+    use datafusion_physical_plan::insert::{DataSink, FileSinkExec};
+    use datafusion_physical_plan::metrics::MetricsSet;
+    use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
+    use datafusion_physical_plan::streaming::{PartitionStream, 
StreamingTableExec};
+    use datafusion_physical_plan::{DisplayAs, DisplayFormatType, 
ExecutionPlan};
     use futures::StreamExt;
-    use itertools::enumerate;
     use nix::sys::stat;
     use nix::unistd;
-    use rstest::*;
+    use std::any::Any;
+    use std::collections::HashMap;
+    use std::fmt::Formatter;
     use std::fs::{File, OpenOptions};
     use std::io::Write;
     use std::path::PathBuf;
-    use std::sync::atomic::{AtomicBool, Ordering};
     use std::sync::Arc;
-    use std::thread;
-    use std::thread::JoinHandle;
-    use std::time::{Duration, Instant};
     use tempfile::TempDir;
+    use tokio::task::{spawn_blocking, JoinHandle};
+
+    #[derive(Default)]
+    struct FifoFactory {}
+
+    #[async_trait]
+    impl TableProviderFactory for FifoFactory {
+        async fn create(
+            &self,
+            _state: &SessionState,
+            cmd: &CreateExternalTable,
+        ) -> Result<Arc<dyn TableProvider>> {
+            let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
+            let location = cmd.location.clone();
+            Ok(fifo_table(schema, location, None))
+        }
+    }
+
+    #[derive(Debug)]
+    struct FifoConfig {
+        schema: SchemaRef,
+        location: PathBuf,
+        sort: Option<LexOrdering>,
+    }
+
+    struct FifoTable(Arc<FifoConfig>);
+
+    #[async_trait]
+    impl TableProvider for FifoTable {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+
+        fn schema(&self) -> SchemaRef {
+            self.0.schema.clone()
+        }
+
+        fn table_type(&self) -> TableType {
+            TableType::Temporary
+        }
+
+        async fn scan(
+            &self,
+            _state: &SessionState,
+            projection: Option<&Vec<usize>>,
+            _filters: &[Expr],
+            _limit: Option<usize>,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Ok(Arc::new(StreamingTableExec::try_new(
+                self.0.schema.clone(),
+                vec![Arc::new(FifoRead(self.0.clone())) as _],
+                projection,
+                self.0.sort.clone(),
+                true,
+            )?))
+        }
+
+        async fn insert_into(
+            &self,
+            _state: &SessionState,
+            input: Arc<dyn ExecutionPlan>,
+            _overwrite: bool,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            let ordering = match &self.0.sort {
+                Some(order) => Some(order.iter().map(|e| 
e.clone().into()).collect()),
+                None => None,
+            };
+
+            Ok(Arc::new(FileSinkExec::new(
+                input,
+                Arc::new(FifoWrite(self.0.clone())),
+                self.0.schema.clone(),
+                ordering,
+            )))
+        }
+    }
+
+    struct FifoRead(Arc<FifoConfig>);
+
+    impl PartitionStream for FifoRead {
+        fn schema(&self) -> &SchemaRef {
+            &self.0.schema
+        }
+
+        fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream 
{
+            let config = self.0.clone();
+            let schema = self.0.schema.clone();
+            let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2);
+            let tx = builder.tx();
+            builder.spawn_blocking(move || {
+                let file = File::open(&config.location)?;
+                let reader = 
ReaderBuilder::new(config.schema.clone()).build(file)?;
+                for b in reader {
+                    if tx.blocking_send(b.map_err(Into::into)).is_err() {
+                        break;
+                    }
+                }
+                Ok(())
+            });
+            builder.build()
+        }
+    }
+
+    #[derive(Debug)]
+    struct FifoWrite(Arc<FifoConfig>);
+
+    impl DisplayAs for FifoWrite {
+        fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+            write!(f, "{self:?}")
+        }
+    }
+
+    #[async_trait]
+    impl DataSink for FifoWrite {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+
+        fn metrics(&self) -> Option<MetricsSet> {
+            None
+        }
+
+        async fn write_all(
+            &self,
+            mut data: SendableRecordBatchStream,
+            _context: &Arc<TaskContext>,
+        ) -> Result<u64> {
+            let config = self.0.clone();
+            let (sender, mut receiver) = 
tokio::sync::mpsc::channel::<RecordBatch>(2);
+            // Note: FIFO Files support poll so this could use AsyncFd

Review Comment:
   This is an example of the potential benefits of a custom TableProvider, it 
can use external knowledge about the nature of the files in question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to