alamb commented on code in PR #4101:
URL: https://github.com/apache/arrow-rs/pull/4101#discussion_r1175732070


##########
arrow-flight/src/encode.rs:
##########
@@ -194,17 +211,28 @@ impl FlightDataEncoder {
             app_metadata: Some(app_metadata),
             queue: VecDeque::new(),
             done: false,
+            descriptor: None,
         };
 
         // If schema is known up front, enqueue it immediately
         if let Some(schema) = schema {
             encoder.encode_schema(&schema);
         }
+
+        encoder.descriptor = descriptor;

Review Comment:
   I think this should be set before calling `encoder.encode_schema()` so it is 
included on the first Schema message, if provided



##########
arrow-flight/tests/encode_decode.rs:
##########
@@ -136,6 +138,31 @@ async fn test_zero_batches_schema_specified() {
     assert_eq!(decoder.schema(), Some(&schema));
 }
 
+#[tokio::test]
+async fn test_with_flight_descriptor() {
+    let stream = futures::stream::iter(vec![Ok(make_dictionary_batch(5))]);
+    let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, 
true)]));
+
+    let descriptor = Some(FlightDescriptor {
+        r#type: DescriptorType::Path.into(),
+        path: vec!["table_name".to_string()],
+        cmd: Bytes::default(),
+    });
+
+    let encoder = FlightDataEncoderBuilder::default()
+        .with_schema(schema.clone())
+        .with_flight_descriptor(descriptor.clone());
+
+    let mut encoder = encoder.build(stream);
+
+    // First batch should be the schema
+    encoder.next().await.unwrap().unwrap();

Review Comment:
   My understanding of 
https://arrow.apache.org/docs/format/Flight.html#uploading-data was that the 
first `FlightData` should have the `descriptor`, not the second 🤔 



##########
arrow-flight/src/encode.rs:
##########
@@ -194,17 +211,28 @@ impl FlightDataEncoder {
             app_metadata: Some(app_metadata),
             queue: VecDeque::new(),
             done: false,
+            descriptor: None,
         };
 
         // If schema is known up front, enqueue it immediately
         if let Some(schema) = schema {
             encoder.encode_schema(&schema);
         }
+
+        encoder.descriptor = descriptor;
+
         encoder
     }
 
     /// Place the `FlightData` in the queue to send
     fn queue_message(&mut self, data: FlightData) {
+        if let Some(descriptor) = &self.descriptor {
+            let mut data = data;
+            data.flight_descriptor = Some(descriptor.clone());
+            self.descriptor = None;
+            self.queue.push_back(data);
+            return;
+        }
         self.queue.push_back(data);
     }

Review Comment:
   FWIW I think you can write this using `take()` a bit more concisely like:
   
   ```suggestion
       fn queue_message(&mut self, mut data: FlightData) {
           if let Some(descriptor) = self.descriptor.take() {
               data.flight_descriptor = Some(descriptor.clone());
           }
           self.queue.push_back(data);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to