I really need to "get into the zone" on some other development today, but I want to remind us of something earlier in the thread that gave me the impression I wasn't stomping on too many paradigms with this proposal:
Wes: ``So the "length" field in RecordBatch is already the utilized number of rows. The body buffers can certainly have excess unused space. So your application can mutate Flatbuffer "length" field in-place as new records are filled in.'' If RecordBatch.length is the utilized number of rows then my PR makes this actually true. Yes, we need it in a handful of implementations. I'm willing to provide all of them. To me that is the lowest complexity solution. -John On Wed, Oct 16, 2019 at 10:45 AM Wes McKinney <wesmck...@gmail.com> wrote: > On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen <j...@jgm.org> wrote: > > > > "pyarrow is intended as a developer-facing library, not a user-facing > one" > > > > Is that really the core issue? I doubt you would want to add this > proposed > > logic to pandas even though it is user-facing, because then pandas will > > either have to re-implement what it means to read a batch (to respect > > length when it is smaller than array length) or else rely on the single > > blessed custom metadata for doing this, which doesn't make it custom > > anymore. > > What you have proposed in your PR amounts to an alteration of the IPC > format to suit this use case. This pushes complexity onto _every_ > implementation that will need to worry about a "truncated" record > batch. I'd rather avoid this unless it is truly the only way. > > Note that we serialize a significant amount of custom metadata already > to address pandas-specific issues, and have not had to make any > changes to the columnar format as a result. > > > I think really your concern is that perhaps nobody wants this but me, > > therefore it should not be in arrow or pandas regardless of whether it is > > user-facing? But, if that is your thinking, is it true? What is our > > solution to the locality/latency problem for systems that ingest and > > process concurrently, if not this solution? I do see it as a general > > problem that needs at least the beginnings of a general solution... not a > > "custom" one. > > We use the custom_metadata fields to implement a number of built-in > things in the project, such as extension types. If enough people find > this useful, then it can be promoted to a formalized concept. As far > as I can tell, you have developed quite a bit of custom code related > to this for your application, including manipulating Flatbuffers > metadata in place to maintain the populated length, so the barrier to > entry to being able to properly take advantage of this is rather high. > > > Also, I wonder whether it is true that pyarrow avoids smart/magical > > things. The entire concept of a "Table" seems to be in that category? > The > > docs specifically mention that it is for convenience. > > > > Table arose out of legitimate developer need. There are a number of > areas of the project that would be much more difficult if we had to > worry about regularizing column chunking at any call site that returns > an in-memory dataset. > > > I'd like to focus on two questions: > > 1- What is the Arrow general solution to the locality/latency tradeoff > > problem for systems that ingest and process data concurrently? This > > proposed solution or something else? Or if we propose not to address the > > problem, why? > > 2- What will the proposed change negatively impact? It seems that all we > > are talking about is respecting batch length if arrays happen to be > longer. > > I'm suggesting to help you solve the post-read truncation problem > without modifying the IPC protocol. If you want to make things work > for the users without knowledge, I think this can be achieved through > a plug-in API to define a metadata handler-callback to apply the > truncation to the record batches. > > > Thanks, > > -John > > > > On Wed, Oct 16, 2019 at 8:37 AM Wes McKinney <wesmck...@gmail.com> > wrote: > > > > > hi John, > > > > > > > As a practical matter, the reason metadata is not a good solution > for me > > > is that it requires awareness on the part of the reader. I want > (e.g.) a > > > researcher in Python to be able to map a file of batches in IPC format > > > without needing to worry about the fact that the file was built in a > > > streaming fashion and therefore has some unused array elements. > > > > > > I don't find this argument to be persuasive. > > > > > > pyarrow is intended as a developer-facing library, not a user-facing > > > one. I don't think you should be having the kinds of users you are > > > describing using pyarrow directly, instead consuming the library > > > through a layer above it. Specifically, we are deliberately avoiding > > > doing anything too "smart" or "magical", instead maintaining tight > > > developer control over what is going on. > > > > > > - Wes > > > > > > On Wed, Oct 16, 2019 at 2:18 AM Micah Kornfield <emkornfi...@gmail.com > > > > > wrote: > > > > > > > > Still thinking through the implications here, but to save others from > > > > having to go search [1] is the PR. > > > > > > > > [1] https://github.com/apache/arrow/pull/5663/files > > > > > > > > On Tue, Oct 15, 2019 at 1:42 PM John Muehlhausen <j...@jgm.org> > wrote: > > > > > > > > > A proposal with linked PR now exists in ARROW-5916 and Wes > commented > > > that > > > > > we should kick it around some more. > > > > > > > > > > The high-level topic is how Apache Arrow intersects with streaming > > > > > methodologies: > > > > > > > > > > If record batches are strictly immutable, a difficult trade-off is > > > created > > > > > for streaming data collection: either I can have low-latency > > > presentation > > > > > of new data by appending very small batches (often 1 row) to the > IPC > > > stream > > > > > and lose columnar layout benefits, or I can have high-latency > > > presentation > > > > > of new data by waiting to append a batch until it is large enough > to > > > gain > > > > > significant columnar layout benefits. During this waiting period > the > > > new > > > > > data is unavailable to processing. > > > > > > > > > > If, on the other hand, [0,length) of a batch is immutable but > length > > > may > > > > > increase, the trade-off is eliminated: I can pre-allocate a batch > and > > > > > populate records in it when they occur (without waiting), and also > gain > > > > > columnar benefits as each "closed" batch will be large. (A batch > may > > > be > > > > > practically "closed" before the arrays are full when the > projection of > > > > > variable-length buffer space is wrong... a space/time tradeoff in > > > favor of > > > > > time.) > > > > > > > > > > Looking ahead to a day when the reference implementation(s) will be > > > able to > > > > > bump RecordBatch.length while populating pre-allocated records > > > > > in-place, ARROW-5916 reads such batches by ignoring portions of > arrays > > > that > > > > > are beyond RecordBatch.length. > > > > > > > > > > If we are not looking ahead to such a day, the discussion is about > the > > > > > alternative way that Arrow will avoid the latency/locality tradeoff > > > > > inherent in streaming data collection. Or, if the answer is > "streaming > > > > > apps are and will always be out of scope", that idea needs to be > > > defended > > > > > from the observation that practitioners are moving more towards the > > > fusion > > > > > of batch and streaming, not away from it. > > > > > > > > > > As a practical matter, the reason metadata is not a good solution > for > > > me is > > > > > that it requires awareness on the part of the reader. I want > (e.g.) a > > > > > researcher in Python to be able to map a file of batches in IPC > format > > > > > without needing to worry about the fact that the file was built in > a > > > > > streaming fashion and therefore has some unused array elements. > > > > > > > > > > The change itself seems relatively simple. What negative > consequences > > > do > > > > > we anticipate, if any? > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Fri, Jul 5, 2019 at 10:42 AM John Muehlhausen <j...@jgm.org> > wrote: > > > > > > > > > > > This seems to help... still testing it though. > > > > > > > > > > > > Status GetFieldMetadata(int field_index, ArrayData* out) { > > > > > > auto nodes = metadata_->nodes(); > > > > > > // pop off a field > > > > > > if (field_index >= static_cast<int>(nodes->size())) { > > > > > > return Status::Invalid("Ran out of field metadata, likely > > > > > > malformed"); > > > > > > } > > > > > > const flatbuf::FieldNode* node = nodes->Get(field_index); > > > > > > > > > > > > * //out->length = node->length();* > > > > > > * out->length = metadata_->length();* > > > > > > out->null_count = node->null_count(); > > > > > > out->offset = 0; > > > > > > return Status::OK(); > > > > > > } > > > > > > > > > > > > On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen <j...@jgm.org> > > > wrote: > > > > > > > > > > > >> So far it seems as if pyarrow is completely ignoring the > > > > > >> RecordBatch.length field. More info to follow... > > > > > >> > > > > > >> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen <j...@jgm.org> > > > wrote: > > > > > >> > > > > > >>> Crikey! I'll do some testing around that and suggest some test > > > cases to > > > > > >>> ensure it continues to work, assuming that it does. > > > > > >>> > > > > > >>> -John > > > > > >>> > > > > > >>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney < > wesmck...@gmail.com> > > > > > wrote: > > > > > >>> > > > > > >>>> Thanks for the attachment, it's helpful. > > > > > >>>> > > > > > >>>> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen <j...@jgm.org> > > > wrote: > > > > > >>>> > > > > > > >>>> > Attachments referred to in previous two messages: > > > > > >>>> > > > > > > >>>> > > > > > > > > > https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0 > > > > > >>>> > > > > > > >>>> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen < > j...@jgm.org> > > > > > wrote: > > > > > >>>> > > > > > > >>>> > > Thanks, Wes, for the thoughtful reply. I really > appreciate > > > the > > > > > >>>> > > engagement. In order to clarify things a bit, I am > attaching > > > a > > > > > >>>> graphic of > > > > > >>>> > > how our application will take record-wise (row-oriented) > data > > > from > > > > > >>>> an event > > > > > >>>> > > source and incrementally populate a pre-allocated > > > Arrow-compatible > > > > > >>>> buffer, > > > > > >>>> > > including for variable-length fields. (Obviously at this > > > stage I > > > > > >>>> am not > > > > > >>>> > > using the reference implementation Arrow code, although > that > > > would > > > > > >>>> be a > > > > > >>>> > > goal.... to contribute that back to the project.) > > > > > >>>> > > > > > > > >>>> > > For sake of simplicity these are non-nullable fields. As > a > > > > > result a > > > > > >>>> > > reader of "y" that has no knowledge of the "utilized" > metadata > > > > > >>>> would get a > > > > > >>>> > > long string (zeros, spaces, uninitialized, or whatever we > > > decide > > > > > >>>> for the > > > > > >>>> > > pre-allocation model) for the record just beyond the last > > > utilized > > > > > >>>> record. > > > > > >>>> > > > > > > > >>>> > > I don't see any "big O"-analysis problems with this > > > approach. The > > > > > >>>> > > space/time tradeoff is that we have to guess how much > room to > > > > > >>>> allocate for > > > > > >>>> > > variable-length fields. We will probably almost always be > > > wrong. > > > > > >>>> This > > > > > >>>> > > ends up in "wasted" space. However, we can do > calculations > > > based > > > > > >>>> on these > > > > > >>>> > > partially filled batches that take full advantage of the > > > columnar > > > > > >>>> layout. > > > > > >>>> > > (Here I've shown the case where we had too little > > > variable-length > > > > > >>>> buffer > > > > > >>>> > > set aside, resulting in "wasted" rows. The flip side is > that > > > rows > > > > > >>>> achieve > > > > > >>>> > > full [1] utilization but there is wasted variable-length > > > buffer if > > > > > >>>> we guess > > > > > >>>> > > incorrectly in the other direction.) > > > > > >>>> > > > > > > > >>>> > > I proposed a few things that are "nice to have" but really > > > what > > > > > I'm > > > > > >>>> eyeing > > > > > >>>> > > is the ability for a reader-- any reader (e.g. pyarrow)-- > to > > > see > > > > > >>>> that some > > > > > >>>> > > of the rows in a RecordBatch are not to be read, based on > the > > > new > > > > > >>>> > > "utilized" (or whatever name) metadata. That single > tweak to > > > the > > > > > >>>> > > metadata-- and readers honoring it-- is the core of the > > > proposal. > > > > > >>>> > > (Proposal 4.) This would indicate that the attached > example > > > (or > > > > > >>>> something > > > > > >>>> > > similar) is the blessed approach for those seeking to > > > accumulate > > > > > >>>> events and > > > > > >>>> > > process them while still expecting more data, with the > > > > > >>>> heavier-weight task > > > > > >>>> > > of creating a new pre-allocated batch being a rare > occurrence. > > > > > >>>> > > > > > > > >>>> > > > > > >>>> So the "length" field in RecordBatch is already the utilized > > > number of > > > > > >>>> rows. The body buffers can certainly have excess unused > space. So > > > your > > > > > >>>> application can mutate Flatbuffer "length" field in-place as > new > > > > > >>>> records are filled in. > > > > > >>>> > > > > > >>>> > > Notice that the mutability is only in the sense of > > > "appending." > > > > > The > > > > > >>>> > > current doctrine of total immutability would be revised to > > > refer > > > > > to > > > > > >>>> the > > > > > >>>> > > immutability of only the already-populated rows. > > > > > >>>> > > > > > > > >>>> > > It gives folks an option other than choosing the lesser > of two > > > > > >>>> evils: on > > > > > >>>> > > the one hand, length 1 RecordBatches that don't result in > a > > > stream > > > > > >>>> that is > > > > > >>>> > > computationally efficient. On the other hand, adding > > > artificial > > > > > >>>> latency by > > > > > >>>> > > accumulating events before "freezing" a larger batch and > only > > > then > > > > > >>>> making > > > > > >>>> > > it available to computation. > > > > > >>>> > > > > > > > >>>> > > -John > > > > > >>>> > > > > > > > >>>> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney < > > > wesmck...@gmail.com > > > > > > > > > > > >>>> wrote: > > > > > >>>> > > > > > > > >>>> > >> hi John, > > > > > >>>> > >> > > > > > >>>> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen < > > > j...@jgm.org> > > > > > >>>> wrote: > > > > > >>>> > >> > > > > > > >>>> > >> > During my time building financial analytics and trading > > > systems > > > > > >>>> (23 > > > > > >>>> > >> years!), both the "batch processing" and "stream > processing" > > > > > >>>> paradigms have > > > > > >>>> > >> been extensively used by myself and by colleagues. > > > > > >>>> > >> > > > > > > >>>> > >> > Unfortunately, the tools used in these paradigms have > not > > > > > >>>> successfully > > > > > >>>> > >> overlapped. For example, an analyst might use a Python > > > notebook > > > > > >>>> with > > > > > >>>> > >> pandas to do some batch analysis. Then, for acceptable > > > latency > > > > > and > > > > > >>>> > >> throughput, a C++ programmer must implement the same > schemas > > > and > > > > > >>>> processing > > > > > >>>> > >> logic in order to analyze real-time data for real-time > > > decision > > > > > >>>> support. > > > > > >>>> > >> (Time horizons often being sub-second or even > > > sub-millisecond for > > > > > >>>> an > > > > > >>>> > >> acceptable reaction to an event. The most aggressive > > > > > >>>> software-based > > > > > >>>> > >> systems, leaving custom hardware aside other than things > like > > > > > >>>> kernel-bypass > > > > > >>>> > >> NICs, target 10s of microseconds for a full round trip > from > > > data > > > > > >>>> ingestion > > > > > >>>> > >> to decision.) > > > > > >>>> > >> > > > > > > >>>> > >> > As a result, TCO is more than doubled. A doubling can > be > > > > > >>>> accounted for > > > > > >>>> > >> by two implementations that share little or nothing in > the > > > way of > > > > > >>>> > >> architecture. Then additional effort is required to > ensure > > > that > > > > > >>>> these > > > > > >>>> > >> implementations continue to behave the same way and are > > > upgraded > > > > > in > > > > > >>>> > >> lock-step. > > > > > >>>> > >> > > > > > > >>>> > >> > Arrow purports to be a "bridge" technology that eases > one > > > of > > > > > the > > > > > >>>> pain > > > > > >>>> > >> points of working in different ecosystems by providing a > > > common > > > > > >>>> event > > > > > >>>> > >> stream data structure. (Discussion of common processing > > > > > >>>> techniques is > > > > > >>>> > >> beyond the scope of this discussion. Suffice it to say > that > > > a > > > > > >>>> streaming > > > > > >>>> > >> algo can always be run in batch, but not vice versa.) > > > > > >>>> > >> > > > > > > >>>> > >> > Arrow seems to be growing up primarily in the batch > > > processing > > > > > >>>> world. > > > > > >>>> > >> One publication notes that "the missing piece is > streaming, > > > where > > > > > >>>> the > > > > > >>>> > >> velocity of incoming data poses a special challenge. > There > > > are > > > > > >>>> some early > > > > > >>>> > >> experiments to populate Arrow nodes in microbatches..." > [1] > > > Part > > > > > >>>> our our > > > > > >>>> > >> discussion could be a response to this observation. In > what > > > ways > > > > > >>>> is it > > > > > >>>> > >> true or false? What are the plans to remedy this > > > shortcoming, if > > > > > >>>> it > > > > > >>>> > >> exists? What steps can be taken now to ease the > transition > > > to > > > > > >>>> low-latency > > > > > >>>> > >> streaming support in the future? > > > > > >>>> > >> > > > > > > >>>> > >> > > > > > >>>> > >> Arrow columnar format describes a collection of records > with > > > > > values > > > > > >>>> > >> between records being placed adjacent to each other in > > > memory. If > > > > > >>>> you > > > > > >>>> > >> break that assumption, you don't have a columnar format > > > anymore. > > > > > >>>> So I > > > > > >>>> > >> don't where the "shortcoming" is. We don't have any > software > > > in > > > > > the > > > > > >>>> > >> project for managing the creation of record batches in a > > > > > streaming > > > > > >>>> > >> application, but this seems like an interesting > development > > > > > >>>> expansion > > > > > >>>> > >> area for the project. > > > > > >>>> > >> > > > > > >>>> > >> Note that many contributors have already expanded the > surface > > > > > area > > > > > >>>> of > > > > > >>>> > >> what's in the Arrow libraries in many directions. > > > > > >>>> > >> > > > > > >>>> > >> Streaming data collection is yet another area of > expansion, > > > but > > > > > >>>> > >> _personally_ it is not on the short list of projects > that I > > > will > > > > > >>>> > >> personally be working on (or asking my direct or indirect > > > > > >>>> colleagues > > > > > >>>> > >> to work on). Since this is a project made up of > volunteers, > > > it's > > > > > >>>> up to > > > > > >>>> > >> contributors to drive new directions for the project by > > > writing > > > > > >>>> design > > > > > >>>> > >> documents and pull requests. > > > > > >>>> > >> > > > > > >>>> > >> > In my own experience, a successful strategy for stream > > > > > >>>> processing where > > > > > >>>> > >> context (i.e. recent past events) must be considered by > > > > > >>>> calculations is to > > > > > >>>> > >> pre-allocate memory for event collection, to organize > this > > > memory > > > > > >>>> in a > > > > > >>>> > >> columnar layout, and to run incremental calculations at > each > > > > > event > > > > > >>>> ingress > > > > > >>>> > >> into the partially populated memory. [Fig 1] When the > > > > > >>>> pre-allocated > > > > > >>>> > >> memory has been exhausted, allocate a new batch of > > > column-wise > > > > > >>>> memory and > > > > > >>>> > >> continue. When a batch is no longer pertinent to the > > > calculation > > > > > >>>> look-back > > > > > >>>> > >> window, free the memory back to the heap or pool. > > > > > >>>> > >> > > > > > > >>>> > >> > Here we run into the first philosophical barrier with > > > Arrow, > > > > > >>>> where > > > > > >>>> > >> "Arrow data is immutable." [2] There is currently > little or > > > no > > > > > >>>> > >> consideration for reading a partially constructed > > > RecordBatch, > > > > > >>>> e.g. one > > > > > >>>> > >> with only some of the rows containing event data at the > > > present > > > > > >>>> moment in > > > > > >>>> > >> time. > > > > > >>>> > >> > > > > > > >>>> > >> > > > > > >>>> > >> It seems like the use case you have heavily revolves > around > > > > > >>>> mutating > > > > > >>>> > >> pre-allocated, memory-mapped datasets that are being > > > consumed by > > > > > >>>> other > > > > > >>>> > >> processes on the same host. So you want to incrementally > fill > > > > > some > > > > > >>>> > >> memory-mapped data that you've already exposed to another > > > > > process. > > > > > >>>> > >> > > > > > >>>> > >> Because of the memory layout for variable-size and nested > > > cells, > > > > > >>>> it is > > > > > >>>> > >> impossible in general to mutate Arrow record batches. > This is > > > > > not a > > > > > >>>> > >> philosophical position: this was a deliberate technical > > > decision > > > > > to > > > > > >>>> > >> guarantee data locality for scans and predictable O(1) > random > > > > > >>>> access > > > > > >>>> > >> on variable-length and nested data. > > > > > >>>> > >> > > > > > >>>> > >> Technically speaking, you can mutate memory in-place for > > > > > fixed-size > > > > > >>>> > >> types in-RAM or on-disk, if you want to. It's an > "off-label" > > > use > > > > > >>>> case > > > > > >>>> > >> but no one is saying you can't do this. > > > > > >>>> > >> > > > > > >>>> > >> > Proposal 1: Shift the Arrow "immutability" doctrine to > > > apply to > > > > > >>>> > >> populated records of a RecordBatch instead of to all > records? > > > > > >>>> > >> > > > > > > >>>> > >> > > > > > >>>> > >> Per above, this is impossible in generality. You can't > alter > > > > > >>>> > >> variable-length or nested records without rewriting the > > > record > > > > > >>>> batch. > > > > > >>>> > >> > > > > > >>>> > >> > As an alternative approach, RecordBatch can be used as > a > > > single > > > > > >>>> Record > > > > > >>>> > >> (batch length of one). [Fig 2] In this approach the > > > benefit of > > > > > >>>> the > > > > > >>>> > >> columnar layout is lost for look-back window processing. > > > > > >>>> > >> > > > > > > >>>> > >> > Another alternative approach is to collect an entire > > > > > RecordBatch > > > > > >>>> before > > > > > >>>> > >> stepping through it with the stream processing > calculation. > > > [Fig > > > > > >>>> 3] With > > > > > >>>> > >> this approach some columnar processing benefit can be > > > recovered, > > > > > >>>> however > > > > > >>>> > >> artificial latency is introduced. As tolerance for > delays in > > > > > >>>> decision > > > > > >>>> > >> support dwindles, this model will be of increasingly > limited > > > > > >>>> value. It is > > > > > >>>> > >> already unworkable in many areas of finance. > > > > > >>>> > >> > > > > > > >>>> > >> > When considering the Arrow format and variable length > > > values > > > > > >>>> such as > > > > > >>>> > >> strings, the pre-allocation approach (and subsequent > > > processing > > > > > of > > > > > >>>> a > > > > > >>>> > >> partially populated batch) encounters a hiccup. How do > we > > > know > > > > > >>>> the amount > > > > > >>>> > >> of buffer space to pre-allocate? If we allocate too much > > > buffer > > > > > >>>> for > > > > > >>>> > >> variable-length data, some of it will be unused. If we > > > allocate > > > > > >>>> too little > > > > > >>>> > >> buffer for variable-length data, some row entities will > be > > > > > >>>> unusable. > > > > > >>>> > >> (Additional "rows" remain but when populating string > fields > > > there > > > > > >>>> is no > > > > > >>>> > >> longer string storage space to point them to.) > > > > > >>>> > >> > > > > > > >>>> > >> > As with many optimization space/time tradeoff > problems, the > > > > > >>>> solution > > > > > >>>> > >> seems to be to guess. Pre-allocation sets aside variable > > > length > > > > > >>>> buffer > > > > > >>>> > >> storage based on the typical "expected size" of the > variable > > > > > >>>> length data. > > > > > >>>> > >> This can result in some unused rows, as discussed above. > > > [Fig 4] > > > > > >>>> In fact > > > > > >>>> > >> it will necessarily result in one unused row unless the > last > > > of > > > > > >>>> each > > > > > >>>> > >> variable length field in the last row exactly fits into > the > > > > > >>>> remaining space > > > > > >>>> > >> in the variable length data buffer. Consider the case > where > > > > > there > > > > > >>>> is more > > > > > >>>> > >> variable length buffer space than data: > > > > > >>>> > >> > > > > > > >>>> > >> > Given variable-length field x, last row index of y, > > > variable > > > > > >>>> length > > > > > >>>> > >> buffer v, beginning offset into v of o: > > > > > >>>> > >> > x[y] begins at o > > > > > >>>> > >> > x[y] ends at the offset of the next record, there > is no > > > > > next > > > > > >>>> > >> record, so x[y] ends after the total remaining area in > > > variable > > > > > >>>> length > > > > > >>>> > >> buffer... however, this is too much! > > > > > >>>> > >> > > > > > > >>>> > >> > > > > > >>>> > >> It isn't clear to me what you're proposing. It sounds > like > > > you > > > > > >>>> want a > > > > > >>>> > >> major redesign of the columnar format to permit in-place > > > mutation > > > > > >>>> of > > > > > >>>> > >> strings. I doubt that would be possible at this point. > > > > > >>>> > >> > > > > > >>>> > >> > Proposal 2: [low priority] Create an "expected length" > > > > > statistic > > > > > >>>> in the > > > > > >>>> > >> Schema for variable length fields? > > > > > >>>> > >> > > > > > > >>>> > >> > Proposal 3: [low priority] Create metadata to store the > > > index > > > > > >>>> into > > > > > >>>> > >> variable-length data that represents the end of the value > > > for the > > > > > >>>> last > > > > > >>>> > >> record? Alternatively: a row is "wasted," however > > > pre-allocation > > > > > >>>> is > > > > > >>>> > >> inexact to begin with. > > > > > >>>> > >> > > > > > > >>>> > >> > Proposal 4: Add metadata to indicate to a RecordBatch > > > reader > > > > > >>>> that only > > > > > >>>> > >> some of the rows are to be utilized. [Fig 5] This is > > > useful not > > > > > >>>> only when > > > > > >>>> > >> processing a batch that is still under construction, but > > > also for > > > > > >>>> "closed" > > > > > >>>> > >> batches that were not able to be fully populated due to > an > > > > > >>>> imperfect > > > > > >>>> > >> projection of variable length storage. > > > > > >>>> > >> > > > > > > >>>> > >> > On this last proposal, Wes has weighed in: > > > > > >>>> > >> > > > > > > >>>> > >> > "I believe your use case can be addressed by > pre-allocating > > > > > >>>> record > > > > > >>>> > >> batches and maintaining application level metadata about > what > > > > > >>>> portion of > > > > > >>>> > >> the record batches has been 'filled' (so the unfilled > > > records can > > > > > >>>> be > > > > > >>>> > >> dropped by slicing). I don't think any change to the > binary > > > > > >>>> protocol is > > > > > >>>> > >> warranted." [3] > > > > > >>>> > >> > > > > > > >>>> > >> > > > > > >>>> > >> My personal opinion is that a solution to the problem you > > > have > > > > > can > > > > > >>>> be > > > > > >>>> > >> composed from the components (combined with some new > pieces > > > of > > > > > >>>> code) > > > > > >>>> > >> that we have developed in the project already. > > > > > >>>> > >> > > > > > >>>> > >> So the "application level" could be an add-on C++ > component > > > in > > > > > the > > > > > >>>> > >> Apache Arrow project. Call it a "memory-mapped streaming > data > > > > > >>>> > >> collector" that pre-allocates on-disk record batches (of > only > > > > > >>>> > >> fixed-size or even possibly dictionary-encoded types) and > > > then > > > > > >>>> fills > > > > > >>>> > >> them incrementally as bits of data come in, updating some > > > > > auxiliary > > > > > >>>> > >> metadata that other processes can use to determine what > > > portion > > > > > of > > > > > >>>> the > > > > > >>>> > >> Arrow IPC messages to "slice off". > > > > > >>>> > >> > > > > > >>>> > >> > Concerns with positioning this at the app level: > > > > > >>>> > >> > > > > > > >>>> > >> > 1- Do we need to address or begin to address the > overall > > > > > concern > > > > > >>>> of how > > > > > >>>> > >> Arrow data structures are to be used in "true" > > > (non-microbatch) > > > > > >>>> streaming > > > > > >>>> > >> environments, cf [1] in the last paragraph, as a > > > *first-class* > > > > > >>>> usage > > > > > >>>> > >> pattern? If so, is now the time? > > > > > >>>> > >> >if you break that design invariant you don't have a > columnar > > > > > >>>> format > > > > > >>>> > >> anymore. > > > > > >>>> > >> > > > > > >>>> > >> Arrow provides a binary protocol for describing a payload > > > data on > > > > > >>>> the > > > > > >>>> > >> wire (or on-disk, or in-memory, all the same). I don't > see > > > how it > > > > > >>>> is > > > > > >>>> > >> in conflict with streaming environments, unless the > streaming > > > > > >>>> > >> application has difficulty collecting multiple records > into > > > an > > > > > >>>> Arrow > > > > > >>>> > >> record batches. In that case, it's a system trade-off. > > > Currently > > > > > >>>> > >> people are using Avro with Kafka and sending one record > at a > > > > > time, > > > > > >>>> but > > > > > >>>> > >> then they're also spending a lot of CPU cycles in > > > serialization. > > > > > >>>> > >> > > > > > >>>> > >> > 2- If we can even make broad-stroke attempts at data > > > structure > > > > > >>>> features > > > > > >>>> > >> that are likely to be useful when streaming becomes a > first > > > class > > > > > >>>> citizen, > > > > > >>>> > >> it reduces the chances of "breaking" format changes in > the > > > > > >>>> future. I do > > > > > >>>> > >> not believe the proposals place an undue hardship on > batch > > > > > >>>> processing > > > > > >>>> > >> paradigms. We are currently discussing making a breaking > > > change > > > > > >>>> to the IPC > > > > > >>>> > >> format [4], so there is a window of opportunity to > consider > > > > > >>>> features useful > > > > > >>>> > >> for streaming? (Current clients can feel free to ignore > the > > > > > >>>> proposed > > > > > >>>> > >> "utilized" metadata of RecordBatch.) > > > > > >>>> > >> > > > > > > >>>> > >> > > > > > >>>> > >> I think the perception that streaming is not a first > class > > > > > citizen > > > > > >>>> is > > > > > >>>> > >> an editorialization (e.g. the article you cited was an > > > editorial > > > > > >>>> > >> written by an industry analyst based on an interview with > > > Jacques > > > > > >>>> and > > > > > >>>> > >> me). Columnar data formats in general are designed to > work > > > with > > > > > >>>> more > > > > > >>>> > >> than one value at a time (which we are calling a "batch" > but > > > I > > > > > >>>> think > > > > > >>>> > >> that's conflating terminology with the "batch processing" > > > > > paradigm > > > > > >>>> of > > > > > >>>> > >> Hadoop, etc.), > > > > > >>>> > >> > > > > > >>>> > >> > 3- Part of the promise of Arrow is that applications > are > > > not a > > > > > >>>> world > > > > > >>>> > >> unto themselves, but interoperate with other > Arrow-compliant > > > > > >>>> systems. In > > > > > >>>> > >> my case, I would like users to be able to examine > > > RecordBatchs in > > > > > >>>> tools > > > > > >>>> > >> such as pyarrow without needing to be aware of any > streaming > > > > > >>>> app-specific > > > > > >>>> > >> metadata. For example, a researcher may pull in an IPC > > > "File" > > > > > >>>> containing N > > > > > >>>> > >> RecordBatch messages corresponding to those in Fig 4. I > > > would > > > > > >>>> very much > > > > > >>>> > >> like for this casual user to not have to apply N slice > > > operations > > > > > >>>> based on > > > > > >>>> > >> out-of-band data to get to the data that is relevant. > > > > > >>>> > >> > > > > > > >>>> > >> > > > > > >>>> > >> Per above, should this become a standard enough use > case, I > > > think > > > > > >>>> that > > > > > >>>> > >> code can be developed in the Apache project to address > it. > > > > > >>>> > >> > > > > > >>>> > >> > Devil's advocate: > > > > > >>>> > >> > > > > > > >>>> > >> > 1- Concurrent access to a mutable (growing) RecordBatch > > > will > > > > > >>>> require > > > > > >>>> > >> synchronization of some sort to get consistent metadata > > > reads. > > > > > >>>> Since the > > > > > >>>> > >> above proposals do not specify how this synchronization > will > > > > > occur > > > > > >>>> for > > > > > >>>> > >> tools such as pyarrow (we can imagine a Python user > getting > > > > > >>>> synchronized > > > > > >>>> > >> access to File metadata and mapping a read-only area > before > > > the > > > > > >>>> writer is > > > > > >>>> > >> allowed to continue "appending" to this batch, or > batches to > > > this > > > > > >>>> File), > > > > > >>>> > >> some "unusual" code will be required anyway, so what is > the > > > harm > > > > > of > > > > > >>>> > >> consulting side-band data for slicing all the batches as > > > part of > > > > > >>>> this > > > > > >>>> > >> "unusual" code? [Potential response: Yes, but it is > still > > > one > > > > > >>>> less thing > > > > > >>>> > >> to worry about, and perhaps first-class support for > common > > > > > >>>> synchronization > > > > > >>>> > >> patterns can be forthcoming? These patterns may not > require > > > > > >>>> further format > > > > > >>>> > >> changes?] > > > > > >>>> > >> > > > > > > >>>> > >> > My overall concern is that I see a lot of wasted effort > > > dealing > > > > > >>>> with > > > > > >>>> > >> the "impedance mismatch" between batch oriented and > streaming > > > > > >>>> systems. I > > > > > >>>> > >> believe that "best practices" will begin (and continue!) > to > > > > > prefer > > > > > >>>> tools > > > > > >>>> > >> that help bridge the gap. Certainly this is the case in > my > > > own > > > > > >>>> work. I > > > > > >>>> > >> agree with the appraisal at the end of the ZDNet article. > > > If the > > > > > >>>> above is > > > > > >>>> > >> not a helpful solution, what other steps can be made? > Or if > > > > > Arrow > > > > > >>>> is > > > > > >>>> > >> intentionally confined to batch processing for the > > > foreseeable > > > > > >>>> future (in > > > > > >>>> > >> terms of first-class support), I'm interested in the > > > rationale. > > > > > >>>> Perhaps > > > > > >>>> > >> the feeling is that we avoid scope creep now (which I > > > understand > > > > > >>>> can be > > > > > >>>> > >> never-ending) even if it means a certain breaking change > in > > > the > > > > > >>>> future? > > > > > >>>> > >> > > > > > > >>>> > >> > > > > > >>>> > >> There's some semantic issues with what "streaming" and > > > "batch" > > > > > >>>> means. > > > > > >>>> > >> When people see "streaming" nowadays they think "Kafka" > (or > > > > > >>>> > >> Kafka-like). Single events flow in and out of streaming > > > > > computation > > > > > >>>> > >> nodes (e.g. like > https://apache.github.io/incubator-heron/ > > > or > > > > > >>>> others). > > > > > >>>> > >> The "streaming" is more about computational semantics > than > > > data > > > > > >>>> > >> representation. > > > > > >>>> > >> > > > > > >>>> > >> The Arrow columnar format fundamentally deals with > multiple > > > > > >>>> records at > > > > > >>>> > >> a time (you can have a record batch with size 1, but > that is > > > not > > > > > >>>> going > > > > > >>>> > >> to be efficient). But I do not think Arrow is > "intentially > > > > > >>>> confined" > > > > > >>>> > >> to batch processing. If it makes sense to use a columnar > > > format > > > > > to > > > > > >>>> > >> represent data in a streaming application, then you can > > > certainly > > > > > >>>> use > > > > > >>>> > >> it for that. I'm aware of people successfully using Arrow > > > with > > > > > >>>> Kafka, > > > > > >>>> > >> for example. > > > > > >>>> > >> > > > > > >>>> > >> - Wes > > > > > >>>> > >> > > > > > >>>> > >> > Who else encounters the need to mix/match batch and > > > streaming, > > > > > >>>> and what > > > > > >>>> > >> are your experiences? > > > > > >>>> > >> > > > > > > >>>> > >> > Thanks for the further consideration and discussion! > > > > > >>>> > >> > > > > > > >>>> > >> > [1] https://zd.net/2H0LlBY > > > > > >>>> > >> > [2] https://arrow.apache.org/docs/python/data.html > > > > > >>>> > >> > [3] https://bit.ly/2J5sENZ > > > > > >>>> > >> > [4] https://bit.ly/2Yske8L > > > > > >>>> > >> > > > > > >>>> > > > > > > > >>>> > > > > > >>> > > > > > > > > > >