kflansburg commented on pull request #8968: URL: https://github.com/apache/arrow/pull/8968#issuecomment-748616649
Hey @jorgecarleitao, thanks for the input. > Wrt to the first point, we could use micro-batches, but doesn't that defeat the purpose of the Arrow format? The whole idea is based on the notion of data locality, low metadata footprint, and batch processing. All these are shredded in a micro-batch architecture. I don't believe this is an accurate statement, while they are called "micro" batches, depending on the throughput of the topics that you are consuming, they may contain tens of thousands of messages. Regardless, I think there is an argument for leveraging Arrow's compute functionality, and the ecosystem built around the format, even when processing smaller pieces of data. > Wrt to the second point, wouldn't it make sense to build a `stream` instead of an `iterator`? Currently we will be blocking the thread waiting for something to happen on the topic, no? To be clear, the blocking behavior is up to the user. They may set a `poll_timeout` of 0 to prevent any blocking. `librdkafka` operates a background thread where the actual messages are fetched from Kafka. You mentioned that Kafka is event *driven* and I do not believe that is the case. Kafka acts as a buffer which stores event messages. Consumers fetch events at the rate that they wish to process them. If there are no new events, consumers periodically poll for new messages (this could be `async`, but I think that is out of scope here). This connector may also be used for mass-loading of an entire topic worth of messages, not just watching for new ones. > Wrt to the third point, why adding the complexity of Arrow if in the end the payload is an opaque binary? The user would still have to convert that payload to the arrow format for compute (or is the idea to keep that as opaque?), so, IMO we are not really solving the problem: why would a user prefer to use a `RecordBatch` of N messages instead of a `Vec<KafkaBatch>` (or even better, not block the thread and just use a stream of `KafkaBatch`? According to the Kafka API, the payload *is* an opaque binary (which I might add is a supported data type in Arrow). As you mentioned, it is possible that the topic contains a custom encoding of data, and users should be able to access this raw data. Another case, however, is that the data is JSON or Avro (or Arrow Flight as Andy suggested). As mentioned in discussion above, I plan to allow the user to specify additional information when building the reader (schema, format) which will allow it to interpret the payload as JSON, etc. I wanted to keep this PR scoped to basic functionality for now. #8971 is another PR that I have opened to begin work on such functionality for JSON. > I.e. there should be a stream adapter (a chunk iterator) that maps rows to a batch, either in arrow or in other format. You've mentioned Streams several times, and I'm glad that you brought it up. I purposefully avoided `async` code here (it seems to me that most of the core Arrow library does not use `async` code?). It is totally possible to introduce a `Stream` API here as well (`rdkafka` has this already), but I think that a synchronous variant is more basic and that users should have the choice of using `async` or not. > But again, why bother with Vec<KafkaBatch> -> RecordBatch? A StructArray with a Binary field still needs to be parsed into an arrow array given a schema, and that is the most blocking operation of all of this. We would still need to agree upfront about the format of the payload, which is typically heterogeneous and use-case dependent (and seldomly decided by the consumer). I believe that this is the first step to achieving the more complex features that you are describing. This PR: 1. Loads data from Kafka. 2. Forms it into Arrow based on just what we know about the data from Kafka API (opaque binary payloads). In future work, we can: * Introduce an `async` API. * Automatically parse payloads. * Add projections to pick out specific fields from Kafka messages. * Build more complex KSQL-like behavior into DataFusion. Hopefully you can see how this PR fits into that process. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
