lidavidm commented on code in PR #169:
URL: https://github.com/apache/arrow-go/pull/169#discussion_r1814178164
##########
arrow/cdata/cdata_exports.go:
##########
@@ -482,3 +495,70 @@ func (rr cRecordReader) release() {
}
rr.rdr.Release()
}
+
+type cAsyncStreamHandler struct {
+ producer *CArrowAsyncProducer
+ taskQueue chan taskState
+ ctx context.Context
+}
+
+func asyncTaskQueue(ctx context.Context, schema *arrow.Schema, recordStream
chan<- RecordMessage, taskQueue <-chan taskState, producer
*CArrowAsyncProducer) {
+ defer close(recordStream)
+ for {
+ select {
+ case <-ctx.Done():
+ C.goCallCancel(producer)
+ return
+ case task, ok := <-taskQueue:
+ // if the queue closes or we receive a nil task, we're
done
+ if !ok || (task.err == nil && task.task.extract_data ==
nil) {
+ return
+ }
+
+ if task.err != nil {
+ recordStream <- RecordMessage{Err: task.err}
+ continue
+ }
+
+ // request another batch now that we've processed this
one
+ C.goCallRequest(producer, C.int64_t(1))
+
+ var out CArrowDeviceArray
+ if C.goExtractTaskData(&task.task, &out) != C.int(0) {
+ continue
+ }
+
+ rec, err := ImportCRecordBatchWithSchema(&out.array,
schema)
+ if err != nil {
+ recordStream <- RecordMessage{Err: err}
+ } else {
+ recordStream <- RecordMessage{Record: rec,
AdditionalMetadata: task.meta}
+ }
+ }
+ }
+}
+
+func (h *cAsyncStreamHandler) onNextTask(task *CArrowAsyncTask, metadata
*C.char) C.int {
+ if task == nil {
+ h.taskQueue <- taskState{}
+ return 0
+ }
+
+ ts := taskState{task: *task}
+ if metadata != nil {
+ ts.meta = decodeCMetadata(metadata)
+ }
+ h.taskQueue <- ts
+ return 0
+}
+
+func (h *cAsyncStreamHandler) onError(code C.int, message, metadata *C.char) {
Review Comment:
Confusingly in Go this declares two parameters of type `*C.char`. I can't
find it clearly explained in the spec, but you can see more examples here:
https://go.dev/ref/spec#Function_declarations
Also here:
https://www.reddit.com/r/golang/comments/gsqiee/multiple_arguments_of_the_same_type_in_a_function/
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]