silence-coding opened a new issue #2058:
URL: https://github.com/apache/arrow-datafusion/issues/2058


   panicked at 'attempt to subtract with overflow', 
datafusion-7.0.0/src/datasource/file_format/json.rs:71:17
   
   It appears that schema_infer_max_rec must be greater than the number of data 
rows.
   ```
       async fn infer_schema(&self, mut readers: ObjectReaderStream) -> 
Result<SchemaRef> {
           let mut schemas = Vec::new();
           let mut records_to_read = 
self.schema_infer_max_rec.unwrap_or(usize::MAX);
           while let Some(obj_reader) = readers.next().await {
               let mut reader = BufReader::new(obj_reader?.sync_reader()?);
               let iter = ValueIter::new(&mut reader, None);
               let schema = infer_json_schema_from_iterator(iter.take_while(|_| 
{
                   let should_take = records_to_read > 0;
                   records_to_read -= 1;   // panicked at 'attempt to subtract 
with overflow'
                   should_take
               }))?;
               if records_to_read == 0 {
                   break;
               }
               schemas.push(schema);
           }
   
           let schema = Schema::try_merge(schemas)?;
           Ok(Arc::new(schema))
       }
   ````
   
   
   
   
   
   Here's my test code.
   ```
   use std::sync::Arc;
   use datafusion::datasource::file_format::json::JsonFormat;
   use datafusion::datasource::listing::ListingOptions;
   use datafusion::prelude::ExecutionContext;
   use futures::StreamExt;
   
   pub async fn read_parquet() -> datafusion::error::Result<()> {
       let mut ctx = ExecutionContext::new();
       println!("----------------");
       register_json(&mut ctx, "example", "tests").await;
       let df = ctx.sql("SELECT * FROM example limit  7").await?;
       let record = df.execute_stream_partitioned().await.unwrap();
       for mut record in record {
           for data in record.next().await {
               println!("{:?}", data);
           }
       }
       println!("----------------");
       Ok(())
   }
   
   async fn register_json(ctx: &mut ExecutionContext, name: &str, uri: &str) {
       let target_partitions = {
           let m = ctx.state.lock();
           m.config.target_partitions
       };
       let file_format = 
JsonFormat::default().with_schema_infer_max_rec(Some(3));
       let listing_options = ListingOptions {
           format: Arc::new(file_format),
           collect_stat: false,
           file_extension: ".json".to_owned(),
           target_partitions,
           table_partition_cols: vec![],
       };
       ctx.register_listing_table(name, uri, listing_options, None)
           .await.unwrap();
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to