kou commented on code in PR #169:
URL: https://github.com/apache/arrow-go/pull/169#discussion_r1814066144
##########
arrow/cdata/cdata_exports.go:
##########
@@ -482,3 +495,70 @@ func (rr cRecordReader) release() {
}
rr.rdr.Release()
}
+
+type cAsyncStreamHandler struct {
+ producer *CArrowAsyncProducer
+ taskQueue chan taskState
+ ctx context.Context
+}
+
+func asyncTaskQueue(ctx context.Context, schema *arrow.Schema, recordStream
chan<- RecordMessage, taskQueue <-chan taskState, producer
*CArrowAsyncProducer) {
+ defer close(recordStream)
+ for {
+ select {
+ case <-ctx.Done():
+ C.goCallCancel(producer)
+ return
+ case task, ok := <-taskQueue:
+ // if the queue closes or we receive a nil task, we're
done
+ if !ok || (task.err == nil && task.task.extract_data ==
nil) {
+ return
+ }
+
+ if task.err != nil {
+ recordStream <- RecordMessage{Err: task.err}
+ continue
+ }
+
+ // request another batch now that we've processed this
one
+ C.goCallRequest(producer, C.int64_t(1))
+
+ var out CArrowDeviceArray
+ if C.goExtractTaskData(&task.task, &out) != C.int(0) {
+ continue
+ }
+
+ rec, err := ImportCRecordBatchWithSchema(&out.array,
schema)
+ if err != nil {
+ recordStream <- RecordMessage{Err: err}
+ } else {
+ recordStream <- RecordMessage{Record: rec,
AdditionalMetadata: task.meta}
+ }
+ }
+ }
+}
+
+func (h *cAsyncStreamHandler) onNextTask(task *CArrowAsyncTask, metadata
*C.char) C.int {
+ if task == nil {
+ h.taskQueue <- taskState{}
+ return 0
+ }
+
+ ts := taskState{task: *task}
+ if metadata != nil {
+ ts.meta = decodeCMetadata(metadata)
+ }
+ h.taskQueue <- ts
+ return 0
+}
+
+func (h *cAsyncStreamHandler) onError(code C.int, message, metadata *C.char) {
Review Comment:
I'm not familiar with Go but it seems that `message`'s type is missing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]