kflansburg commented on pull request #8968:
URL: https://github.com/apache/arrow/pull/8968#issuecomment-748616649


   Hey @jorgecarleitao, thanks for the input. 
   
   > Wrt to the first point, we could use micro-batches, but doesn't that 
defeat the purpose of the Arrow format? The whole idea is based on the notion 
of data locality, low metadata footprint, and batch processing. All these are 
shredded in a micro-batch architecture.
   
   I don't believe this is an accurate statement, while they are called "micro" 
batches, depending on the throughput of the topics that you are consuming, they 
may contain tens of thousands of messages. Regardless, I think there is an 
argument for leveraging Arrow's compute functionality, and the ecosystem built 
around the format, even when processing smaller pieces of data.
   
   > Wrt to the second point, wouldn't it make sense to build a `stream` 
instead of an `iterator`? Currently we will be blocking the thread waiting for 
something to happen on the topic, no?
   
   To be clear, the blocking behavior is up to the user. They may set a 
`poll_timeout` of 0 to prevent any blocking. `librdkafka` operates a background 
thread where the actual messages are fetched from Kafka. You mentioned that 
Kafka is event *driven* and I do not believe that is the case. Kafka acts as a 
buffer which stores event messages. Consumers fetch events at the rate that 
they wish to process them. If there are no new events, consumers periodically 
poll for new messages (this could be `async`, but I think that is out of scope 
here). This connector may also be used for mass-loading of an entire topic 
worth of messages, not just watching for new ones. 
   
   > Wrt to the third point, why adding the complexity of Arrow if in the end 
the payload is an opaque binary? The user would still have to convert that 
payload to the arrow format for compute (or is the idea to keep that as 
opaque?), so, IMO we are not really solving the problem: why would a user 
prefer to use a `RecordBatch` of N messages instead of a `Vec<KafkaBatch>` (or 
even better, not block the thread and just use a stream of `KafkaBatch`?
   
   According to the Kafka API, the payload *is* an opaque binary (which I might 
add is a supported data type in Arrow). As you mentioned, it is possible that 
the topic contains a custom encoding of data, and users should be able to 
access this raw data. Another case, however, is that the data is JSON or Avro 
(or Arrow Flight as Andy suggested). As mentioned in discussion above, I plan 
to allow the user to specify additional information when building the reader 
(schema, format) which will allow it to interpret the payload as JSON, etc. I 
wanted to keep this PR scoped to basic functionality for now. #8971  is another 
PR that I have opened to begin work on such functionality for JSON.
   
   > I.e. there should be a stream adapter (a chunk iterator) that maps rows to 
a batch, either in arrow or in other format.
   
   You've mentioned Streams several times, and I'm glad that you brought it up. 
I purposefully avoided `async` code here (it seems to me that most of the core 
Arrow library does not use `async` code?). It is totally possible to introduce 
a `Stream` API here as well (`rdkafka` has this already), but I think that a 
synchronous variant is more basic and that users should have the choice of 
using `async` or not. 
   
   
   > But again, why bother with Vec<KafkaBatch> -> RecordBatch? A StructArray 
with a Binary field still needs to be parsed into an arrow array given a 
schema, and that is the most blocking operation of all of this. We would still 
need to agree upfront about the format of the payload, which is typically 
heterogeneous and use-case dependent (and seldomly decided by the consumer).
   
   I believe that this is the first step to achieving the more complex features 
that you are describing. This PR:
   
   1. Loads data from Kafka.
   2. Forms it into Arrow based on just what we know about the data from Kafka 
API (opaque binary payloads).
   
   In future work, we can:
   
   * Introduce an `async` API.
   * Automatically parse payloads.
   * Add projections to pick out specific fields from Kafka messages.
   * Build more complex KSQL-like behavior into DataFusion.
   
   Hopefully you can see how this PR fits into that process. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to