alamb commented on code in PR #3311:
URL: https://github.com/apache/arrow-datafusion/pull/3311#discussion_r962294364


##########
datafusion/core/src/datasource/datasource.rs:
##########
@@ -72,3 +72,9 @@ pub trait TableProvider: Sync + Send {
         Ok(TableProviderFilterPushDown::Unsupported)
     }
 }
+
+/// A factory which creates TableProviders given appropriate information

Review Comment:
   ```suggestion
   /// A factory which creates [`TableProvider`]s at runtime given a URL.
   /// 
   /// For example, this can be used to create a table "on the fly" 
   /// from a directory of files only when that name is referenced.  
   ```



##########
datafusion/core/src/execution/context.rs:
##########
@@ -477,6 +429,88 @@ impl SessionContext {
         }
     }
 
+    async fn create_custom_table(
+        &self,
+        cmd: &CreateExternalTable,
+    ) -> Result<Arc<DataFrame>> {
+        let factory = &self.table_factories.get(&cmd.file_type).ok_or_else(|| {
+            DataFusionError::Internal(format!(
+                "Unable to find factory for {}",
+                cmd.file_type
+            ))
+        })?;
+        let table = (*factory).create(cmd.name.as_str(), 
cmd.location.as_str());
+        let _ = self.register_table(cmd.name.as_str(), table)?;

Review Comment:
   ```suggestion
            self.register_table(cmd.name.as_str(), table)?;
   ```



##########
.github/workflows/rust.yml:
##########
@@ -277,7 +277,9 @@ jobs:
           rustup default stable
           rustup component add rustfmt
       - name: Run
-        run: ci/scripts/rust_fmt.sh
+        run: |
+          echo '' > datafusion/proto/src/generated/datafusion.rs

Review Comment:
   I think this is a left over from 
https://github.com/apache/arrow-datafusion/pull/3333 -- but I also don't think 
it hurts to leave it in this PR 



##########
datafusion/sql/src/parser.rs:
##########
@@ -56,7 +46,7 @@ pub struct CreateExternalTable {
     /// Optional schema
     pub columns: Vec<ColumnDef>,
     /// File type (Parquet, NDJSON, CSV)

Review Comment:
   ```suggestion
       /// File type (Parquet, NDJSON, CSV, etc)
   ```



##########
datafusion/proto/src/logical_plan.rs:
##########
@@ -491,14 +461,23 @@ impl AsLogicalPlan for LogicalPlanNode {
                     ))
                 })?;
 
-                let pb_file_type: protobuf::FileType =
-                    create_extern_table.file_type.try_into()?;
+                match create_extern_table.file_type.as_str() {
+                    "CSV" | "JSON" | "PARQUET" | "AVRO" => {}

Review Comment:
   It might be nice to avoid having these four strings hardcoded in several 
places, so that if we add another format, we don't have to make changes all 
over the place.
   
   Maybe we could put it into a struct like `BuiltInTableProviders` or something



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1214,19 +1214,6 @@ pub struct CreateView {
     pub definition: Option<String>,
 }
 
-/// Types of files to parse as DataFrames

Review Comment:
   Another potential way to write this (and maybe reduce the diff) would be to 
add a new enum variant like "Dynamic":
   
   ```rust
   pub enum FileType {
       /// Newline-delimited JSON
       NdJson,
    ...
      /// Custom factory
      Dynamic(String),
   }
   ```
   
   This strategy might be more "idiomatic" as it encodes more information in 
the type system rather than a `String`
   
   I don't feel strongly about this, I just wanted to offer it as an option



##########
datafusion/core/src/execution/context.rs:
##########
@@ -210,9 +214,19 @@ impl SessionContext {
             session_id: state.session_id.clone(),
             session_start_time: chrono::Utc::now(),
             state: Arc::new(RwLock::new(state)),
+            table_factories: HashMap::default(),
         }
     }
 
+    /// Register a `TableProviderFactory` for a given `file_type` identifier
+    pub fn register_table_factory(
+        &mut self,
+        file_type: &str,
+        factory: Arc<dyn TableProviderFactory>,
+    ) {
+        let _ = self.table_factories.insert(file_type.to_string(), factory);

Review Comment:
   ```suggestion
           self.table_factories.insert(file_type.to_string(), factory);
   ```
   
   



##########
datafusion/core/src/execution/context.rs:
##########
@@ -477,6 +429,88 @@ impl SessionContext {
         }
     }
 
+    async fn create_custom_table(
+        &self,
+        cmd: &CreateExternalTable,
+    ) -> Result<Arc<DataFrame>> {
+        let factory = &self.table_factories.get(&cmd.file_type).ok_or_else(|| {
+            DataFusionError::Internal(format!(

Review Comment:
   I don't think this is an internal error -- it can happen when the user 
provides a bad query, right? How about `DataFusionError::Execution` instead?



##########
datafusion/core/tests/sql/create_drop.rs:
##########
@@ -262,6 +267,77 @@ async fn create_pipe_delimited_csv_table() -> Result<()> {
     Ok(())
 }
 
+struct TestTableProvider {}
+
+impl TestTableProvider {}
+
+#[async_trait]
+impl TableProvider for TestTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        unimplemented!("TestTableProvider is a stub for testing.")
+    }
+
+    fn schema(&self) -> SchemaRef {
+        unimplemented!("TestTableProvider is a stub for testing.")
+    }
+
+    fn table_type(&self) -> TableType {
+        unimplemented!("TestTableProvider is a stub for testing.")
+    }
+
+    async fn scan(
+        &self,
+        _ctx: &SessionState,
+        _projection: &Option<Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        unimplemented!("TestTableProvider is a stub for testing.")
+    }
+}
+
+struct TestTableFactory {}
+
+impl TableProviderFactory for TestTableFactory {
+    fn create(&self, _name: &str, _path: &str) -> Arc<dyn TableProvider> {
+        Arc::new(TestTableProvider {})
+    }
+}
+
+#[tokio::test]
+async fn create_custom_table() -> Result<()> {
+    let mut ctx = SessionContext::new();
+    ctx.register_table_factory("DELTATABLE", Arc::new(TestTableFactory {}));
+
+    let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 
's3://bucket/schema/table';";
+    ctx.sql(sql).await.unwrap();
+
+    let cat = ctx.catalog("datafusion").unwrap();
+    let schema = cat.schema("public").unwrap();
+    let exists = schema.table_exist("dt");
+    assert!(exists, "Table should have been created!");
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn create_bad_custom_table() {
+    let ctx = SessionContext::new();
+
+    let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 
's3://bucket/schema/table';";
+    let res = ctx.sql(sql).await;
+    match res {
+        Ok(_) => panic!("Registration of tables without factories should 
fail"),
+        Err(e) => {
+            if !e.to_string().contains("Unable to find factory for") {
+                panic!(
+                    "Registration of tables without factories should throw 
correct error"
+                )

Review Comment:
   You can also use `assert!` here like:
   
   ```suggestion
               assert!(e.to_string().contains("Unable to find factory for"), 
"Registration of tables without factories should throw correct error")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to