kflansburg commented on pull request #8968:
URL: https://github.com/apache/arrow/pull/8968#issuecomment-748633120
> Maybe it is easier to illustrate this with an example: let's say that the
payload for a given topic is JSON with the schema `{"value": int32}`. If I
understand you correctly, the idea of this PR is to build a `RecordBatch` with
>
> ```
> ...
> ```
>
> and then convert it to something like
>
> ```
> ...
> ```
>
> using a `to_json` or something. Is this what you mean / direction?
This is correct.
> If yes, note that creating a `BinaryArray` requires a complete `memcopy`
of `Vec<Vec<u8>>`, which is expensive. This is what I meant with "introducing
unnecessary complexity". Wouldn't it be simpler to keep the `KafkaBatch` as is
and use arrow's json reader to convert the blob directly to the corresponding
arrow arrays?
I agree that there is a potential optimization here by parsing at the
message level, rather than forming a `BinaryArray` first.
I have a couple of concerns with this though:
* I do think we should support output to `BinaryArray`, and I think it makes
sense that this low-level reader not include a lot of format-specific
functionality.
* `rdkafka` yields `BorrowedMessages`, so forming batches from them will
require a copy either way.
* De-serializing JSON is already not zero-copy, so I'm not sure how much
performance this buys us.
I will give this some thought for future work, though, because I think it
becomes more critical when reading zero-copy formats.
> Which brings me to my initial question in item 3: IMO the core issue is
not interoperability with `KafkaBatch`, but how to deserialize the payload into
`Arrow`. This PR is focused in converting `KafkaBatch -> RecordBatch`. I am
trying to understand the benefits of memcoping the whole payload from bytes
into a `BinaryArray` instead of keep using a `KafkaBatch` as is.
I intended `KafkaBatch` to be an internal object used for constructing a
`RecordBatch`. I don't think that this would make sense as a component of the
Arrow codebase if it yielded non-arrow values. In my opinion `BinaryArray` *is*
the data type of Kafka payloads. Interpreting it as JSON requires context and
something this low-level should not be opinionated about this.
> Wrt to `async`, Fearless concurrency ftw :) `DataFusion` already supports
`async` scans. Kafka seems to be a good case for that. In this context, if I
was doing this, my strategy would be to work out a Node (`DataFusion`) that
`async` polls from a Kafka stream with the following configuration:
>
> * topic
>
> * expected payload format (e.g. JSON for start)
>
> * a number of rows per Batch (i.e. buffering)
>
> * expected schema
>
>
> and return `RecordBatch`es with the deserialized payload according to the
expected schema (using the corresponding converter).
This all makes sense, and aligns with the roadmap I mentioned above, but I
think it is out of scope for this PR. Think of this PR as implementing Rust's
`std::fs::File` for Kafka. Buffering, `async`, de-serialization, etc. all come
after that.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]