jorgecarleitao commented on pull request #8968:
URL: https://github.com/apache/arrow/pull/8968#issuecomment-748628110
Hey @kflansburg
Thanks for the clarifications and for explaining the rational and use-cases.
I am convinced that there is a good case for using Arrow with Kafka and I would
like to thank you clarifying my view on this topic 👍
> According to the Kafka API, the payload is an opaque binary (which I might
add is a supported data type in Arrow). [...]
Maybe it is easier to illustrate this with an example: let's say that the
payload for a given topic is JSON with the schema `{"value": int32}`. If I
understand you correctly, the idea of this PR is to build a `RecordBatch` with
```
Field("message",
DataType::Struct([
Field("payload", DataType::Binary),
Field("topic", DataType::Utf8),
...
])
)
```
and then convert it to something like
```
Field("message",
DataType::Struct([
Field("payload", DataType::Struct([Field("value":
DataType::Int32)])),
Field("topic", DataType::Utf8),
...
])
)
```
using a `to_json` or something. Is this what you mean / direction?
If yes, note that creating a `BinaryArray` requires a complete `memcopy` of
`Vec<Vec<u8>>`, which is expensive. This is what I meant with "introducing
unnecessary complexity". Wouldn't it be simpler to keep the `KafkaBatch` as is
and use arrow's json reader to convert the blob directly to the corresponding
arrow arrays?
Which brings me to my initial question in item 3: IMO the core issue is not
interoperability with `KafkaBatch`, but how to deserialize the payload into
`Arrow`. This PR is focused in converting `KafkaBatch -> RecordBatch`. I am
trying to understand the benefits of memcoping the whole payload from bytes
into a `BinaryArray` instead of keep using a `KafkaBatch` as is.
Wrt to `async`, Fearless concurrency ftw :) `DataFusion` already supports
`async` scans. Kafka seems to be a good case for that. In this context, if I
was doing this, my strategy would be to work out a Node (`DataFusion`) that
`async` polls from a Kafka stream with the following configuration:
* topic
* expected payload format (e.g. JSON for start)
* a number of rows per Batch (i.e. buffering)
* expected schema
and return `RecordBatch`es with the deserialized payload according to the
expected schema (using the corresponding converter).
Does this make sense?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]