lidavidm commented on code in PR #169:
URL: https://github.com/apache/arrow-go/pull/169#discussion_r1814055905


##########
arrow/cdata/interface.go:
##########
@@ -282,3 +283,62 @@ func ReleaseCArrowArray(arr *CArrowArray) { 
releaseArr(arr) }
 
 // ReleaseCArrowSchema calls ArrowSchemaRelease on the passed in cdata schema
 func ReleaseCArrowSchema(schema *CArrowSchema) { releaseSchema(schema) }
+
+// RecordMessage is a simple container for a record batch channel to stream for
+// using the Async C Data Interface via ExportAsyncRecordBatchStream.
+type RecordMessage struct {
+       Record             arrow.Record
+       AdditionalMetadata arrow.Metadata
+       Err                error
+}
+
+// AsyncRecordBatchStream represents a stream of record batches being read in
+// from an ArrowAsyncDeviceStreamHandler's callbacks. If an error was 
encountered
+// before the call to on_schema, then this will contain the error as Err. 
Otherwise
+// the Schema will be valid and the Stream is a channel of RecordMessages being
+// propagated via on_next_task and extract_data.
+type AsyncRecordBatchStream struct {
+       Schema             *arrow.Schema
+       AdditionalMetadata arrow.Metadata
+       Err                error
+       Stream             <-chan RecordMessage
+}
+
+// AsyncStreamError represents an error encountered via a call to the on_error
+// callback of an ArrowAsyncDeviceStreamHandler. The Code is the error code 
that
+// should be errno compatible.
+type AsyncStreamError struct {
+       Code     int
+       Msg      string
+       Metadata string
+}
+
+func (e AsyncStreamError) Error() string { return e.Msg }
+
+// CreateAsyncDeviceStreamHandler populates a given 
ArrowAsyncDeviceStreamHandler's callbacks
+// and waits for the on_schema callback to be called before passing the 
AsyncRecordBatchStream
+// object across the returned channel.
+//
+// The provided queueSize is the number of records that will be requested at a 
time to be passed
+// along the Stream in the returned AsyncRecordBatchStream. See the 
documentation on
+// https://arrow.apache.org/docs/format/CDeviceDataInterface.html for more 
information as to the
+// expected semantics of that size.
+//
+// The populated ArrowAsyncDeviceStreamHandler can then be given to any 
compatible provider for
+// async record batch streams via the C Device interface.
+func CreateAsyncDeviceStreamHandler(ctx context.Context, queueSize uint64, out 
*CArrowAsyncDeviceStreamHandler) <-chan AsyncRecordBatchStream {
+       ch := make(chan AsyncRecordBatchStream)
+       exportAsyncHandler(cAsyncState{ctx: ctx, ch: ch, queueSize: queueSize}, 
out)
+       return ch
+}
+
+// ExportAsyncRecordBatchStream takes in a schema and a channel of 
RecordMessages along with a
+// ArrowAsyncDeviceStreamHandler to export the records as they come across the 
channel and call
+// the appropriate callbacks on the handler. This function will block until 
the stream is closed
+// or a message containing an error comes across the channel.
+//
+// The returned error will be nil if everything is successful, otherwise it 
will be the error which
+// is encountered on the stream or an AsyncError if one of the handler 
callbacks returns an error.
+func ExportAsyncRecordBatchStream(schema *arrow.Schema, stream <-chan 
RecordMessage, handler *CArrowAsyncDeviceStreamHandler) error {

Review Comment:
   One blocks and the other uses a channel; is there a reason to not have them 
both block or both use a channel?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to