kou commented on code in PR #169:
URL: https://github.com/apache/arrow-go/pull/169#discussion_r1814063366


##########
arrow/cdata/cdata_exports.go:
##########
@@ -33,10 +33,23 @@ package cdata
 // void goReleaseSchema(struct ArrowSchema* schema) {
 //      releaseExportedSchema(schema);
 // }
+//
+// void goCallCancel(struct ArrowAsyncProducer* producer) {
+//  producer->cancel(producer);
+// }
+//
+// int goExtractTaskData(struct ArrowAsyncTask* task, struct ArrowDeviceArray* 
out) {
+//   return task->extract_data(task, out);

Review Comment:
   ```suggestion
   //   return task->extract_data(task, out);
   ```



##########
arrow/cdata/exports.go:
##########
@@ -25,16 +27,49 @@ import (
 )
 
 // #include <stdlib.h>
+// #include <errno.h>
+// #include "arrow/c/abi.h"
 // #include "arrow/c/helpers.h"
 //
 // typedef const char cchar_t;
 // extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
 // extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
 // extern const char* streamGetError(struct ArrowArrayStream*);
 // extern void streamRelease(struct ArrowArrayStream*);
+// extern int asyncStreamOnSchema(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowSchema*, char*);
+// extern int asyncStreamOnNextTask(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowAsyncTask*, char*);
+// extern void asyncStreamOnError(struct ArrowAsyncDeviceStreamHandler*, int, 
char*, char*);
+// extern void asyncStreamRelease(struct ArrowAsyncDeviceStreamHandler*);
+// extern void asyncProducerRequest(struct ArrowAsyncProducer*, int64_t);
+// extern void asyncProducerCancel(struct ArrowAsyncProducer*);
+// extern int asyncTaskExtract(struct ArrowAsyncTask*, struct 
ArrowDeviceArray*);
 // // XXX(https://github.com/apache/arrow-adbc/issues/729)
 // int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct 
ArrowSchema* out);
 // int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct 
ArrowArray* out);
+// int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct 
ArrowDeviceArray* out);
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+//     producer->request(producer, n);
+// }
+// static int goCallOnSchema(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowSchema* schema, char* metadata) {
+//   return handler->on_schema(handler, schema, metadata);
+// }
+// static void goCallOnError(struct ArrowAsyncDeviceStreamHandler* handler, 
int code, char* message, char* metadata) {
+//   handler->on_error(handler, code, message, metadata);
+// }
+// static int goCallOnNextTask(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowAsyncTask* task, char* metadata) {
+//   return handler->on_next_task(handler, task, metadata);
+// }
+//
+// static struct ArrowAsyncProducer* get_producer() {
+//   struct ArrowAsyncProducer* out = (struct 
ArrowAsyncProducer*)malloc(sizeof(struct ArrowAsyncProducer));
+//   memset(out, 0, sizeof(struct ArrowAsyncProducer));
+//   return out;

Review Comment:
   ```suggestion
   //           struct ArrowAsyncProducer* out = (struct 
ArrowAsyncProducer*)malloc(sizeof(struct ArrowAsyncProducer));
   //           memset(out, 0, sizeof(struct ArrowAsyncProducer));
   //           return out;
   ```



##########
arrow/cdata/cdata_exports.go:
##########
@@ -33,10 +33,23 @@ package cdata
 // void goReleaseSchema(struct ArrowSchema* schema) {
 //      releaseExportedSchema(schema);
 // }
+//
+// void goCallCancel(struct ArrowAsyncProducer* producer) {
+//  producer->cancel(producer);

Review Comment:
   ```suggestion
   //   producer->cancel(producer);
   ```



##########
arrow/cdata/exports.go:
##########
@@ -25,16 +27,49 @@ import (
 )
 
 // #include <stdlib.h>
+// #include <errno.h>
+// #include "arrow/c/abi.h"
 // #include "arrow/c/helpers.h"
 //
 // typedef const char cchar_t;
 // extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
 // extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
 // extern const char* streamGetError(struct ArrowArrayStream*);
 // extern void streamRelease(struct ArrowArrayStream*);
+// extern int asyncStreamOnSchema(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowSchema*, char*);
+// extern int asyncStreamOnNextTask(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowAsyncTask*, char*);
+// extern void asyncStreamOnError(struct ArrowAsyncDeviceStreamHandler*, int, 
char*, char*);
+// extern void asyncStreamRelease(struct ArrowAsyncDeviceStreamHandler*);
+// extern void asyncProducerRequest(struct ArrowAsyncProducer*, int64_t);
+// extern void asyncProducerCancel(struct ArrowAsyncProducer*);
+// extern int asyncTaskExtract(struct ArrowAsyncTask*, struct 
ArrowDeviceArray*);
 // // XXX(https://github.com/apache/arrow-adbc/issues/729)
 // int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct 
ArrowSchema* out);
 // int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct 
ArrowArray* out);
+// int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct 
ArrowDeviceArray* out);
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+//     producer->request(producer, n);

Review Comment:
   ```suggestion
   //   producer->request(producer, n);
   ```



##########
arrow/cdata/cdata_exports.go:
##########
@@ -482,3 +495,70 @@ func (rr cRecordReader) release() {
        }
        rr.rdr.Release()
 }
+
+type cAsyncStreamHandler struct {
+       producer  *CArrowAsyncProducer
+       taskQueue chan taskState
+       ctx       context.Context
+}
+
+func asyncTaskQueue(ctx context.Context, schema *arrow.Schema, recordStream 
chan<- RecordMessage, taskQueue <-chan taskState, producer 
*CArrowAsyncProducer) {
+       defer close(recordStream)
+       for {
+               select {
+               case <-ctx.Done():
+                       C.goCallCancel(producer)
+                       return
+               case task, ok := <-taskQueue:
+                       // if the queue closes or we receive a nil task, we're 
done
+                       if !ok || (task.err == nil && task.task.extract_data == 
nil) {
+                               return
+                       }
+
+                       if task.err != nil {
+                               recordStream <- RecordMessage{Err: task.err}
+                               continue
+                       }
+
+                       // request another batch now that we've processed this 
one
+                       C.goCallRequest(producer, C.int64_t(1))
+
+                       var out CArrowDeviceArray
+                       if C.goExtractTaskData(&task.task, &out) != C.int(0) {
+                               continue
+                       }
+
+                       rec, err := ImportCRecordBatchWithSchema(&out.array, 
schema)
+                       if err != nil {
+                               recordStream <- RecordMessage{Err: err}
+                       } else {
+                               recordStream <- RecordMessage{Record: rec, 
AdditionalMetadata: task.meta}
+                       }
+               }
+       }
+}
+
+func (h *cAsyncStreamHandler) onNextTask(task *CArrowAsyncTask, metadata 
*C.char) C.int {
+       if task == nil {
+               h.taskQueue <- taskState{}
+               return 0
+       }
+
+       ts := taskState{task: *task}
+       if metadata != nil {
+               ts.meta = decodeCMetadata(metadata)
+       }
+       h.taskQueue <- ts
+       return 0
+}
+
+func (h *cAsyncStreamHandler) onError(code C.int, message, metadata *C.char) {

Review Comment:
   I'm no familiar with Go but it seems that `message`'s type is missing.



##########
arrow/cdata/cdata_test_framework.go:
##########
@@ -37,6 +37,13 @@ package cdata
 //     return out;
 // }
 //
+// static struct ArrowAsyncDeviceStreamHandler* get_test_async_handler() {
+//   struct ArrowAsyncDeviceStreamHandler* handler =
+//             (struct ArrowAsyncDeviceStreamHandler*)malloc(sizeof(struct 
ArrowAsyncDeviceStreamHandler));
+//      memset(handler, 0, sizeof(*handler));
+//      return handler;

Review Comment:
   ```suggestion
   //   struct ArrowAsyncDeviceStreamHandler* handler =
   //           (struct ArrowAsyncDeviceStreamHandler*)malloc(sizeof(struct 
ArrowAsyncDeviceStreamHandler));
   //   memset(handler, 0, sizeof(*handler));
   //   return handler;
   ```



##########
arrow/cdata/exports.go:
##########
@@ -25,16 +27,49 @@ import (
 )
 
 // #include <stdlib.h>
+// #include <errno.h>
+// #include "arrow/c/abi.h"
 // #include "arrow/c/helpers.h"
 //
 // typedef const char cchar_t;
 // extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
 // extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
 // extern const char* streamGetError(struct ArrowArrayStream*);
 // extern void streamRelease(struct ArrowArrayStream*);
+// extern int asyncStreamOnSchema(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowSchema*, char*);
+// extern int asyncStreamOnNextTask(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowAsyncTask*, char*);
+// extern void asyncStreamOnError(struct ArrowAsyncDeviceStreamHandler*, int, 
char*, char*);
+// extern void asyncStreamRelease(struct ArrowAsyncDeviceStreamHandler*);
+// extern void asyncProducerRequest(struct ArrowAsyncProducer*, int64_t);
+// extern void asyncProducerCancel(struct ArrowAsyncProducer*);
+// extern int asyncTaskExtract(struct ArrowAsyncTask*, struct 
ArrowDeviceArray*);
 // // XXX(https://github.com/apache/arrow-adbc/issues/729)
 // int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct 
ArrowSchema* out);
 // int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct 
ArrowArray* out);
+// int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct 
ArrowDeviceArray* out);
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+//     producer->request(producer, n);
+// }
+// static int goCallOnSchema(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowSchema* schema, char* metadata) {
+//   return handler->on_schema(handler, schema, metadata);

Review Comment:
   ```suggestion
   //           return handler->on_schema(handler, schema, metadata);
   ```



##########
arrow/cdata/exports.go:
##########
@@ -155,3 +190,229 @@ func exportStream(rdr array.RecordReader, out 
*CArrowArrayStream) {
        h := cgo.NewHandle(cRecordReader{rdr: rdr, err: nil})
        out.private_data = createHandle(h)
 }
+
+type cAsyncState struct {
+       ch        chan AsyncRecordBatchStream
+       queueSize uint64
+       ctx       context.Context
+}
+
+type taskState struct {
+       task CArrowAsyncTask
+       meta arrow.Metadata
+       err  error
+}
+
+//export asyncStreamOnSchema
+func asyncStreamOnSchema(self *CArrowAsyncDeviceStreamHandler, schema 
*CArrowSchema, addlMetadata *C.char) C.int {

Review Comment:
   What does "addl" in "addlMetadata" mean?
   I saw https://github.com/apache/arrow/pull/43632/files#r1769172792 but I 
couldn't find it...



##########
arrow/cdata/exports.go:
##########
@@ -25,16 +27,49 @@ import (
 )
 
 // #include <stdlib.h>
+// #include <errno.h>
+// #include "arrow/c/abi.h"
 // #include "arrow/c/helpers.h"
 //
 // typedef const char cchar_t;
 // extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
 // extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
 // extern const char* streamGetError(struct ArrowArrayStream*);
 // extern void streamRelease(struct ArrowArrayStream*);
+// extern int asyncStreamOnSchema(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowSchema*, char*);
+// extern int asyncStreamOnNextTask(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowAsyncTask*, char*);
+// extern void asyncStreamOnError(struct ArrowAsyncDeviceStreamHandler*, int, 
char*, char*);
+// extern void asyncStreamRelease(struct ArrowAsyncDeviceStreamHandler*);
+// extern void asyncProducerRequest(struct ArrowAsyncProducer*, int64_t);
+// extern void asyncProducerCancel(struct ArrowAsyncProducer*);
+// extern int asyncTaskExtract(struct ArrowAsyncTask*, struct 
ArrowDeviceArray*);
 // // XXX(https://github.com/apache/arrow-adbc/issues/729)
 // int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct 
ArrowSchema* out);
 // int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct 
ArrowArray* out);
+// int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct 
ArrowDeviceArray* out);
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+//     producer->request(producer, n);
+// }
+// static int goCallOnSchema(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowSchema* schema, char* metadata) {
+//   return handler->on_schema(handler, schema, metadata);
+// }
+// static void goCallOnError(struct ArrowAsyncDeviceStreamHandler* handler, 
int code, char* message, char* metadata) {
+//   handler->on_error(handler, code, message, metadata);
+// }
+// static int goCallOnNextTask(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowAsyncTask* task, char* metadata) {
+//   return handler->on_next_task(handler, task, metadata);

Review Comment:
   ```suggestion
   //           return handler->on_next_task(handler, task, metadata);
   ```



##########
arrow/cdata/exports.go:
##########
@@ -25,16 +27,49 @@ import (
 )
 
 // #include <stdlib.h>
+// #include <errno.h>
+// #include "arrow/c/abi.h"
 // #include "arrow/c/helpers.h"
 //
 // typedef const char cchar_t;
 // extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
 // extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
 // extern const char* streamGetError(struct ArrowArrayStream*);
 // extern void streamRelease(struct ArrowArrayStream*);
+// extern int asyncStreamOnSchema(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowSchema*, char*);
+// extern int asyncStreamOnNextTask(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowAsyncTask*, char*);
+// extern void asyncStreamOnError(struct ArrowAsyncDeviceStreamHandler*, int, 
char*, char*);
+// extern void asyncStreamRelease(struct ArrowAsyncDeviceStreamHandler*);
+// extern void asyncProducerRequest(struct ArrowAsyncProducer*, int64_t);
+// extern void asyncProducerCancel(struct ArrowAsyncProducer*);
+// extern int asyncTaskExtract(struct ArrowAsyncTask*, struct 
ArrowDeviceArray*);
 // // XXX(https://github.com/apache/arrow-adbc/issues/729)
 // int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct 
ArrowSchema* out);
 // int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct 
ArrowArray* out);
+// int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct 
ArrowDeviceArray* out);
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+//     producer->request(producer, n);
+// }
+// static int goCallOnSchema(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowSchema* schema, char* metadata) {
+//   return handler->on_schema(handler, schema, metadata);
+// }
+// static void goCallOnError(struct ArrowAsyncDeviceStreamHandler* handler, 
int code, char* message, char* metadata) {
+//   handler->on_error(handler, code, message, metadata);

Review Comment:
   ```suggestion
   //           handler->on_error(handler, code, message, metadata);
   ```



##########
arrow/cdata/exports.go:
##########
@@ -155,3 +190,229 @@ func exportStream(rdr array.RecordReader, out 
*CArrowArrayStream) {
        h := cgo.NewHandle(cRecordReader{rdr: rdr, err: nil})
        out.private_data = createHandle(h)
 }
+
+type cAsyncState struct {
+       ch        chan AsyncRecordBatchStream
+       queueSize uint64
+       ctx       context.Context
+}
+
+type taskState struct {
+       task CArrowAsyncTask
+       meta arrow.Metadata
+       err  error
+}
+
+//export asyncStreamOnSchema
+func asyncStreamOnSchema(self *CArrowAsyncDeviceStreamHandler, schema 
*CArrowSchema, addlMetadata *C.char) C.int {
+       h := getHandle(self.private_data)
+       handler := h.Value().(cAsyncState)
+       defer close(handler.ch)
+
+       if self.producer.device_type != C.ARROW_DEVICE_CPU {
+               handler.ch <- AsyncRecordBatchStream{Err: 
fmt.Errorf("unsupported device type")}
+               return C.EINVAL
+       }
+
+       sc, err := ImportCArrowSchema(schema)
+       if err != nil {
+               handler.ch <- AsyncRecordBatchStream{Err: err}
+               return C.EINVAL
+       }
+
+       var meta arrow.Metadata
+       if addlMetadata != nil {
+               meta = decodeCMetadata(addlMetadata)
+       }
+
+       recordStream := make(chan RecordMessage, handler.queueSize)
+       taskQueue := make(chan taskState, handler.queueSize)
+       handler.ch <- AsyncRecordBatchStream{Schema: sc,
+               AdditionalMetadata: meta, Stream: recordStream}

Review Comment:
   Does "addl" mean "additional"!?



##########
arrow/cdata/cdata_exports.go:
##########
@@ -33,10 +33,23 @@ package cdata
 // void goReleaseSchema(struct ArrowSchema* schema) {
 //      releaseExportedSchema(schema);
 // }
+//
+// void goCallCancel(struct ArrowAsyncProducer* producer) {
+//  producer->cancel(producer);
+// }
+//
+// int goExtractTaskData(struct ArrowAsyncTask* task, struct ArrowDeviceArray* 
out) {
+//   return task->extract_data(task, out);
+// }
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+//     producer->request(producer, n);

Review Comment:
   ```suggestion
   //   producer->request(producer, n);
   ```



##########
arrow/cdata/exports.go:
##########
@@ -25,16 +27,49 @@ import (
 )
 
 // #include <stdlib.h>
+// #include <errno.h>
+// #include "arrow/c/abi.h"
 // #include "arrow/c/helpers.h"
 //
 // typedef const char cchar_t;
 // extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
 // extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
 // extern const char* streamGetError(struct ArrowArrayStream*);
 // extern void streamRelease(struct ArrowArrayStream*);
+// extern int asyncStreamOnSchema(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowSchema*, char*);
+// extern int asyncStreamOnNextTask(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowAsyncTask*, char*);
+// extern void asyncStreamOnError(struct ArrowAsyncDeviceStreamHandler*, int, 
char*, char*);
+// extern void asyncStreamRelease(struct ArrowAsyncDeviceStreamHandler*);
+// extern void asyncProducerRequest(struct ArrowAsyncProducer*, int64_t);
+// extern void asyncProducerCancel(struct ArrowAsyncProducer*);
+// extern int asyncTaskExtract(struct ArrowAsyncTask*, struct 
ArrowDeviceArray*);
 // // XXX(https://github.com/apache/arrow-adbc/issues/729)
 // int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct 
ArrowSchema* out);
 // int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct 
ArrowArray* out);
+// int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct 
ArrowDeviceArray* out);
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+//     producer->request(producer, n);
+// }
+// static int goCallOnSchema(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowSchema* schema, char* metadata) {
+//   return handler->on_schema(handler, schema, metadata);
+// }
+// static void goCallOnError(struct ArrowAsyncDeviceStreamHandler* handler, 
int code, char* message, char* metadata) {
+//   handler->on_error(handler, code, message, metadata);
+// }
+// static int goCallOnNextTask(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowAsyncTask* task, char* metadata) {
+//   return handler->on_next_task(handler, task, metadata);
+// }
+//
+// static struct ArrowAsyncProducer* get_producer() {
+//   struct ArrowAsyncProducer* out = (struct 
ArrowAsyncProducer*)malloc(sizeof(struct ArrowAsyncProducer));
+//   memset(out, 0, sizeof(struct ArrowAsyncProducer));
+//   return out;
+// }
+//
+// static void goReleaseAsyncHandler(struct ArrowAsyncDeviceStreamHandler* 
handler) {
+//      handler->release(handler);

Review Comment:
   ```suggestion
   //   handler->release(handler);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to