I really need to "get into the zone" on some other development today, but I
want to remind us of something earlier in the thread that gave me the
impression I wasn't stomping on too many paradigms with this proposal:

Wes: ``So the "length" field in RecordBatch is already the utilized number
of rows. The body buffers can certainly have excess unused space. So
your application can mutate Flatbuffer "length" field in-place as
new records are filled in.''

If RecordBatch.length is the utilized number of rows then my PR makes this
actually true.  Yes, we need it in a handful of implementations.  I'm
willing to provide all of them.  To me that is the lowest complexity
solution.

-John

On Wed, Oct 16, 2019 at 10:45 AM Wes McKinney <wesmck...@gmail.com> wrote:

> On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen <j...@jgm.org> wrote:
> >
> > "pyarrow is intended as a developer-facing library, not a user-facing
> one"
> >
> > Is that really the core issue?  I doubt you would want to add this
> proposed
> > logic to pandas even though it is user-facing, because then pandas will
> > either have to re-implement what it means to read a batch (to respect
> > length when it is smaller than array length) or else rely on the single
> > blessed custom metadata for doing this, which doesn't make it custom
> > anymore.
>
> What you have proposed in your PR amounts to an alteration of the IPC
> format to suit this use case. This pushes complexity onto _every_
> implementation that will need to worry about a "truncated" record
> batch. I'd rather avoid this unless it is truly the only way.
>
> Note that we serialize a significant amount of custom metadata already
> to address pandas-specific issues, and have not had to make any
> changes to the columnar format as a result.
>
> > I think really your concern is that perhaps nobody wants this but me,
> > therefore it should not be in arrow or pandas regardless of whether it is
> > user-facing?  But, if that is your thinking, is it true?  What is our
> > solution to the locality/latency problem for systems that ingest and
> > process concurrently, if not this solution?  I do see it as a general
> > problem that needs at least the beginnings of a general solution... not a
> > "custom" one.
>
> We use the custom_metadata fields to implement a number of built-in
> things in the project, such as extension types. If enough people find
> this useful, then it can be promoted to a formalized concept. As far
> as I can tell, you have developed quite a bit of custom code related
> to this for your application, including manipulating Flatbuffers
> metadata in place to maintain the populated length, so the barrier to
> entry to being able to properly take advantage of this is rather high.
>
> > Also, I wonder whether it is true that pyarrow avoids smart/magical
> > things.  The entire concept of a "Table" seems to be in that category?
> The
> > docs specifically mention that it is for convenience.
> >
>
> Table arose out of legitimate developer need. There are a number of
> areas of the project that would be much more difficult if we had to
> worry about regularizing column chunking at any call site that returns
> an in-memory dataset.
>
> > I'd like to focus on two questions:
> > 1- What is the Arrow general solution to the locality/latency tradeoff
> > problem for systems that ingest and process data concurrently?  This
> > proposed solution or something else?  Or if we propose not to address the
> > problem, why?
> > 2- What will the proposed change negatively impact?  It seems that all we
> > are talking about is respecting batch length if arrays happen to be
> longer.
>
> I'm suggesting to help you solve the post-read truncation problem
> without modifying the IPC protocol. If you want to make things work
> for the users without knowledge, I think this can be achieved through
> a plug-in API to define a metadata handler-callback to apply the
> truncation to the record batches.
>
> > Thanks,
> > -John
> >
> > On Wed, Oct 16, 2019 at 8:37 AM Wes McKinney <wesmck...@gmail.com>
> wrote:
> >
> > > hi John,
> > >
> > > > As a practical matter, the reason metadata is not a good solution
> for me
> > > is that it requires awareness on the part of the reader.  I want
> (e.g.) a
> > > researcher in Python to be able to map a file of batches in IPC format
> > > without needing to worry about the fact that the file was built in a
> > > streaming fashion and therefore has some unused array elements.
> > >
> > > I don't find this argument to be persuasive.
> > >
> > > pyarrow is intended as a developer-facing library, not a user-facing
> > > one. I don't think you should be having the kinds of users you are
> > > describing using pyarrow directly, instead consuming the library
> > > through a layer above it. Specifically, we are deliberately avoiding
> > > doing anything too "smart" or "magical", instead maintaining tight
> > > developer control over what is going on.
> > >
> > > - Wes
> > >
> > > On Wed, Oct 16, 2019 at 2:18 AM Micah Kornfield <emkornfi...@gmail.com
> >
> > > wrote:
> > > >
> > > > Still thinking through the implications here, but to save others from
> > > > having to go search [1] is the PR.
> > > >
> > > > [1] https://github.com/apache/arrow/pull/5663/files
> > > >
> > > > On Tue, Oct 15, 2019 at 1:42 PM John Muehlhausen <j...@jgm.org>
> wrote:
> > > >
> > > > > A proposal with linked PR now exists in ARROW-5916 and Wes
> commented
> > > that
> > > > > we should kick it around some more.
> > > > >
> > > > > The high-level topic is how Apache Arrow intersects with streaming
> > > > > methodologies:
> > > > >
> > > > > If record batches are strictly immutable, a difficult trade-off is
> > > created
> > > > > for streaming data collection: either I can have low-latency
> > > presentation
> > > > > of new data by appending very small batches (often 1 row) to the
> IPC
> > > stream
> > > > > and lose columnar layout benefits, or I can have high-latency
> > > presentation
> > > > > of new data by waiting to append a batch until it is large enough
> to
> > > gain
> > > > > significant columnar layout benefits.  During this waiting period
> the
> > > new
> > > > > data is unavailable to processing.
> > > > >
> > > > > If, on the other hand, [0,length) of a batch is immutable but
> length
> > > may
> > > > > increase, the trade-off is eliminated: I can pre-allocate a batch
> and
> > > > > populate records in it when they occur (without waiting), and also
> gain
> > > > > columnar benefits as each "closed" batch will be large.  (A batch
> may
> > > be
> > > > > practically "closed" before the arrays are full when the
> projection of
> > > > > variable-length buffer space is wrong... a space/time tradeoff in
> > > favor of
> > > > > time.)
> > > > >
> > > > > Looking ahead to a day when the reference implementation(s) will be
> > > able to
> > > > > bump RecordBatch.length while populating pre-allocated records
> > > > > in-place, ARROW-5916 reads such batches by ignoring portions of
> arrays
> > > that
> > > > > are beyond RecordBatch.length.
> > > > >
> > > > > If we are not looking ahead to such a day, the discussion is about
> the
> > > > > alternative way that Arrow will avoid the latency/locality tradeoff
> > > > > inherent in streaming data collection.  Or, if the answer is
> "streaming
> > > > > apps are and will always be out of scope", that idea needs to be
> > > defended
> > > > > from the observation that practitioners are moving more towards the
> > > fusion
> > > > > of batch and streaming, not away from it.
> > > > >
> > > > > As a practical matter, the reason metadata is not a good solution
> for
> > > me is
> > > > > that it requires awareness on the part of the reader.  I want
> (e.g.) a
> > > > > researcher in Python to be able to map a file of batches in IPC
> format
> > > > > without needing to worry about the fact that the file was built in
> a
> > > > > streaming fashion and therefore has some unused array elements.
> > > > >
> > > > > The change itself seems relatively simple.  What negative
> consequences
> > > do
> > > > > we anticipate, if any?
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Fri, Jul 5, 2019 at 10:42 AM John Muehlhausen <j...@jgm.org>
> wrote:
> > > > >
> > > > > > This seems to help... still testing it though.
> > > > > >
> > > > > >   Status GetFieldMetadata(int field_index, ArrayData* out) {
> > > > > >     auto nodes = metadata_->nodes();
> > > > > >     // pop off a field
> > > > > >     if (field_index >= static_cast<int>(nodes->size())) {
> > > > > >       return Status::Invalid("Ran out of field metadata, likely
> > > > > > malformed");
> > > > > >     }
> > > > > >     const flatbuf::FieldNode* node = nodes->Get(field_index);
> > > > > >
> > > > > > *    //out->length = node->length();*
> > > > > > *    out->length = metadata_->length();*
> > > > > >     out->null_count = node->null_count();
> > > > > >     out->offset = 0;
> > > > > >     return Status::OK();
> > > > > >   }
> > > > > >
> > > > > > On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen <j...@jgm.org>
> > > wrote:
> > > > > >
> > > > > >> So far it seems as if pyarrow is completely ignoring the
> > > > > >> RecordBatch.length field.  More info to follow...
> > > > > >>
> > > > > >> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen <j...@jgm.org>
> > > wrote:
> > > > > >>
> > > > > >>> Crikey! I'll do some testing around that and suggest some test
> > > cases to
> > > > > >>> ensure it continues to work, assuming that it does.
> > > > > >>>
> > > > > >>> -John
> > > > > >>>
> > > > > >>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney <
> wesmck...@gmail.com>
> > > > > wrote:
> > > > > >>>
> > > > > >>>> Thanks for the attachment, it's helpful.
> > > > > >>>>
> > > > > >>>> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen <j...@jgm.org>
> > > wrote:
> > > > > >>>> >
> > > > > >>>> > Attachments referred to in previous two messages:
> > > > > >>>> >
> > > > > >>>>
> > > > >
> > >
> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0
> > > > > >>>> >
> > > > > >>>> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen <
> j...@jgm.org>
> > > > > wrote:
> > > > > >>>> >
> > > > > >>>> > > Thanks, Wes, for the thoughtful reply.  I really
> appreciate
> > > the
> > > > > >>>> > > engagement.  In order to clarify things a bit, I am
> attaching
> > > a
> > > > > >>>> graphic of
> > > > > >>>> > > how our application will take record-wise (row-oriented)
> data
> > > from
> > > > > >>>> an event
> > > > > >>>> > > source and incrementally populate a pre-allocated
> > > Arrow-compatible
> > > > > >>>> buffer,
> > > > > >>>> > > including for variable-length fields.  (Obviously at this
> > > stage I
> > > > > >>>> am not
> > > > > >>>> > > using the reference implementation Arrow code, although
> that
> > > would
> > > > > >>>> be a
> > > > > >>>> > > goal.... to contribute that back to the project.)
> > > > > >>>> > >
> > > > > >>>> > > For sake of simplicity these are non-nullable fields.  As
> a
> > > > > result a
> > > > > >>>> > > reader of "y" that has no knowledge of the "utilized"
> metadata
> > > > > >>>> would get a
> > > > > >>>> > > long string (zeros, spaces, uninitialized, or whatever we
> > > decide
> > > > > >>>> for the
> > > > > >>>> > > pre-allocation model) for the record just beyond the last
> > > utilized
> > > > > >>>> record.
> > > > > >>>> > >
> > > > > >>>> > > I don't see any "big O"-analysis problems with this
> > > approach.  The
> > > > > >>>> > > space/time tradeoff is that we have to guess how much
> room to
> > > > > >>>> allocate for
> > > > > >>>> > > variable-length fields.  We will probably almost always be
> > > wrong.
> > > > > >>>> This
> > > > > >>>> > > ends up in "wasted" space.  However, we can do
> calculations
> > > based
> > > > > >>>> on these
> > > > > >>>> > > partially filled batches that take full advantage of the
> > > columnar
> > > > > >>>> layout.
> > > > > >>>> > >  (Here I've shown the case where we had too little
> > > variable-length
> > > > > >>>> buffer
> > > > > >>>> > > set aside, resulting in "wasted" rows.  The flip side is
> that
> > > rows
> > > > > >>>> achieve
> > > > > >>>> > > full [1] utilization but there is wasted variable-length
> > > buffer if
> > > > > >>>> we guess
> > > > > >>>> > > incorrectly in the other direction.)
> > > > > >>>> > >
> > > > > >>>> > > I proposed a few things that are "nice to have" but really
> > > what
> > > > > I'm
> > > > > >>>> eyeing
> > > > > >>>> > > is the ability for a reader-- any reader (e.g. pyarrow)--
> to
> > > see
> > > > > >>>> that some
> > > > > >>>> > > of the rows in a RecordBatch are not to be read, based on
> the
> > > new
> > > > > >>>> > > "utilized" (or whatever name) metadata.  That single
> tweak to
> > > the
> > > > > >>>> > > metadata-- and readers honoring it-- is the core of the
> > > proposal.
> > > > > >>>> > >  (Proposal 4.)  This would indicate that the attached
> example
> > > (or
> > > > > >>>> something
> > > > > >>>> > > similar) is the blessed approach for those seeking to
> > > accumulate
> > > > > >>>> events and
> > > > > >>>> > > process them while still expecting more data, with the
> > > > > >>>> heavier-weight task
> > > > > >>>> > > of creating a new pre-allocated batch being a rare
> occurrence.
> > > > > >>>> > >
> > > > > >>>>
> > > > > >>>> So the "length" field in RecordBatch is already the utilized
> > > number of
> > > > > >>>> rows. The body buffers can certainly have excess unused
> space. So
> > > your
> > > > > >>>> application can mutate Flatbuffer "length" field in-place as
> new
> > > > > >>>> records are filled in.
> > > > > >>>>
> > > > > >>>> > > Notice that the mutability is only in the sense of
> > > "appending."
> > > > > The
> > > > > >>>> > > current doctrine of total immutability would be revised to
> > > refer
> > > > > to
> > > > > >>>> the
> > > > > >>>> > > immutability of only the already-populated rows.
> > > > > >>>> > >
> > > > > >>>> > > It gives folks an option other than choosing the lesser
> of two
> > > > > >>>> evils: on
> > > > > >>>> > > the one hand, length 1 RecordBatches that don't result in
> a
> > > stream
> > > > > >>>> that is
> > > > > >>>> > > computationally efficient.  On the other hand, adding
> > > artificial
> > > > > >>>> latency by
> > > > > >>>> > > accumulating events before "freezing" a larger batch and
> only
> > > then
> > > > > >>>> making
> > > > > >>>> > > it available to computation.
> > > > > >>>> > >
> > > > > >>>> > > -John
> > > > > >>>> > >
> > > > > >>>> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney <
> > > wesmck...@gmail.com
> > > > > >
> > > > > >>>> wrote:
> > > > > >>>> > >
> > > > > >>>> > >> hi John,
> > > > > >>>> > >>
> > > > > >>>> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen <
> > > j...@jgm.org>
> > > > > >>>> wrote:
> > > > > >>>> > >> >
> > > > > >>>> > >> > During my time building financial analytics and trading
> > > systems
> > > > > >>>> (23
> > > > > >>>> > >> years!), both the "batch processing" and "stream
> processing"
> > > > > >>>> paradigms have
> > > > > >>>> > >> been extensively used by myself and by colleagues.
> > > > > >>>> > >> >
> > > > > >>>> > >> > Unfortunately, the tools used in these paradigms have
> not
> > > > > >>>> successfully
> > > > > >>>> > >> overlapped.  For example, an analyst might use a Python
> > > notebook
> > > > > >>>> with
> > > > > >>>> > >> pandas to do some batch analysis.  Then, for acceptable
> > > latency
> > > > > and
> > > > > >>>> > >> throughput, a C++ programmer must implement the same
> schemas
> > > and
> > > > > >>>> processing
> > > > > >>>> > >> logic in order to analyze real-time data for real-time
> > > decision
> > > > > >>>> support.
> > > > > >>>> > >> (Time horizons often being sub-second or even
> > > sub-millisecond for
> > > > > >>>> an
> > > > > >>>> > >> acceptable reaction to an event.  The most aggressive
> > > > > >>>> software-based
> > > > > >>>> > >> systems, leaving custom hardware aside other than things
> like
> > > > > >>>> kernel-bypass
> > > > > >>>> > >> NICs, target 10s of microseconds for a full round trip
> from
> > > data
> > > > > >>>> ingestion
> > > > > >>>> > >> to decision.)
> > > > > >>>> > >> >
> > > > > >>>> > >> > As a result, TCO is more than doubled.  A doubling can
> be
> > > > > >>>> accounted for
> > > > > >>>> > >> by two implementations that share little or nothing in
> the
> > > way of
> > > > > >>>> > >> architecture.  Then additional effort is required to
> ensure
> > > that
> > > > > >>>> these
> > > > > >>>> > >> implementations continue to behave the same way and are
> > > upgraded
> > > > > in
> > > > > >>>> > >> lock-step.
> > > > > >>>> > >> >
> > > > > >>>> > >> > Arrow purports to be a "bridge" technology that eases
> one
> > > of
> > > > > the
> > > > > >>>> pain
> > > > > >>>> > >> points of working in different ecosystems by providing a
> > > common
> > > > > >>>> event
> > > > > >>>> > >> stream data structure.  (Discussion of common processing
> > > > > >>>> techniques is
> > > > > >>>> > >> beyond the scope of this discussion.  Suffice it to say
> that
> > > a
> > > > > >>>> streaming
> > > > > >>>> > >> algo can always be run in batch, but not vice versa.)
> > > > > >>>> > >> >
> > > > > >>>> > >> > Arrow seems to be growing up primarily in the batch
> > > processing
> > > > > >>>> world.
> > > > > >>>> > >> One publication notes that "the missing piece is
> streaming,
> > > where
> > > > > >>>> the
> > > > > >>>> > >> velocity of incoming data poses a special challenge.
> There
> > > are
> > > > > >>>> some early
> > > > > >>>> > >> experiments to populate Arrow nodes in microbatches..."
> [1]
> > > Part
> > > > > >>>> our our
> > > > > >>>> > >> discussion could be a response to this observation.  In
> what
> > > ways
> > > > > >>>> is it
> > > > > >>>> > >> true or false?  What are the plans to remedy this
> > > shortcoming, if
> > > > > >>>> it
> > > > > >>>> > >> exists?  What steps can be taken now to ease the
> transition
> > > to
> > > > > >>>> low-latency
> > > > > >>>> > >> streaming support in the future?
> > > > > >>>> > >> >
> > > > > >>>> > >>
> > > > > >>>> > >> Arrow columnar format describes a collection of records
> with
> > > > > values
> > > > > >>>> > >> between records being placed adjacent to each other in
> > > memory. If
> > > > > >>>> you
> > > > > >>>> > >> break that assumption, you don't have a columnar format
> > > anymore.
> > > > > >>>> So I
> > > > > >>>> > >> don't where the "shortcoming" is. We don't have any
> software
> > > in
> > > > > the
> > > > > >>>> > >> project for managing the creation of record batches in a
> > > > > streaming
> > > > > >>>> > >> application, but this seems like an interesting
> development
> > > > > >>>> expansion
> > > > > >>>> > >> area for the project.
> > > > > >>>> > >>
> > > > > >>>> > >> Note that many contributors have already expanded the
> surface
> > > > > area
> > > > > >>>> of
> > > > > >>>> > >> what's in the Arrow libraries in many directions.
> > > > > >>>> > >>
> > > > > >>>> > >> Streaming data collection is yet another area of
> expansion,
> > > but
> > > > > >>>> > >> _personally_ it is not on the short list of projects
> that I
> > > will
> > > > > >>>> > >> personally be working on (or asking my direct or indirect
> > > > > >>>> colleagues
> > > > > >>>> > >> to work on). Since this is a project made up of
> volunteers,
> > > it's
> > > > > >>>> up to
> > > > > >>>> > >> contributors to drive new directions for the project by
> > > writing
> > > > > >>>> design
> > > > > >>>> > >> documents and pull requests.
> > > > > >>>> > >>
> > > > > >>>> > >> > In my own experience, a successful strategy for stream
> > > > > >>>> processing where
> > > > > >>>> > >> context (i.e. recent past events) must be considered by
> > > > > >>>> calculations is to
> > > > > >>>> > >> pre-allocate memory for event collection, to organize
> this
> > > memory
> > > > > >>>> in a
> > > > > >>>> > >> columnar layout, and to run incremental calculations at
> each
> > > > > event
> > > > > >>>> ingress
> > > > > >>>> > >> into the partially populated memory.  [Fig 1]  When the
> > > > > >>>> pre-allocated
> > > > > >>>> > >> memory has been exhausted, allocate a new batch of
> > > column-wise
> > > > > >>>> memory and
> > > > > >>>> > >> continue.  When a batch is no longer pertinent to the
> > > calculation
> > > > > >>>> look-back
> > > > > >>>> > >> window, free the memory back to the heap or pool.
> > > > > >>>> > >> >
> > > > > >>>> > >> > Here we run into the first philosophical barrier with
> > > Arrow,
> > > > > >>>> where
> > > > > >>>> > >> "Arrow data is immutable." [2]  There is currently
> little or
> > > no
> > > > > >>>> > >> consideration for reading a partially constructed
> > > RecordBatch,
> > > > > >>>> e.g. one
> > > > > >>>> > >> with only some of the rows containing event data at the
> > > present
> > > > > >>>> moment in
> > > > > >>>> > >> time.
> > > > > >>>> > >> >
> > > > > >>>> > >>
> > > > > >>>> > >> It seems like the use case you have heavily revolves
> around
> > > > > >>>> mutating
> > > > > >>>> > >> pre-allocated, memory-mapped datasets that are being
> > > consumed by
> > > > > >>>> other
> > > > > >>>> > >> processes on the same host. So you want to incrementally
> fill
> > > > > some
> > > > > >>>> > >> memory-mapped data that you've already exposed to another
> > > > > process.
> > > > > >>>> > >>
> > > > > >>>> > >> Because of the memory layout for variable-size and nested
> > > cells,
> > > > > >>>> it is
> > > > > >>>> > >> impossible in general to mutate Arrow record batches.
> This is
> > > > > not a
> > > > > >>>> > >> philosophical position: this was a deliberate technical
> > > decision
> > > > > to
> > > > > >>>> > >> guarantee data locality for scans and predictable O(1)
> random
> > > > > >>>> access
> > > > > >>>> > >> on variable-length and nested data.
> > > > > >>>> > >>
> > > > > >>>> > >> Technically speaking, you can mutate memory in-place for
> > > > > fixed-size
> > > > > >>>> > >> types in-RAM or on-disk, if you want to. It's an
> "off-label"
> > > use
> > > > > >>>> case
> > > > > >>>> > >> but no one is saying you can't do this.
> > > > > >>>> > >>
> > > > > >>>> > >> > Proposal 1: Shift the Arrow "immutability" doctrine to
> > > apply to
> > > > > >>>> > >> populated records of a RecordBatch instead of to all
> records?
> > > > > >>>> > >> >
> > > > > >>>> > >>
> > > > > >>>> > >> Per above, this is impossible in generality. You can't
> alter
> > > > > >>>> > >> variable-length or nested records without rewriting the
> > > record
> > > > > >>>> batch.
> > > > > >>>> > >>
> > > > > >>>> > >> > As an alternative approach, RecordBatch can be used as
> a
> > > single
> > > > > >>>> Record
> > > > > >>>> > >> (batch length of one).  [Fig 2]  In this approach the
> > > benefit of
> > > > > >>>> the
> > > > > >>>> > >> columnar layout is lost for look-back window processing.
> > > > > >>>> > >> >
> > > > > >>>> > >> > Another alternative approach is to collect an entire
> > > > > RecordBatch
> > > > > >>>> before
> > > > > >>>> > >> stepping through it with the stream processing
> calculation.
> > > [Fig
> > > > > >>>> 3]  With
> > > > > >>>> > >> this approach some columnar processing benefit can be
> > > recovered,
> > > > > >>>> however
> > > > > >>>> > >> artificial latency is introduced.  As tolerance for
> delays in
> > > > > >>>> decision
> > > > > >>>> > >> support dwindles, this model will be of increasingly
> limited
> > > > > >>>> value.  It is
> > > > > >>>> > >> already unworkable in many areas of finance.
> > > > > >>>> > >> >
> > > > > >>>> > >> > When considering the Arrow format and variable length
> > > values
> > > > > >>>> such as
> > > > > >>>> > >> strings, the pre-allocation approach (and subsequent
> > > processing
> > > > > of
> > > > > >>>> a
> > > > > >>>> > >> partially populated batch) encounters a hiccup.  How do
> we
> > > know
> > > > > >>>> the amount
> > > > > >>>> > >> of buffer space to pre-allocate?  If we allocate too much
> > > buffer
> > > > > >>>> for
> > > > > >>>> > >> variable-length data, some of it will be unused.  If we
> > > allocate
> > > > > >>>> too little
> > > > > >>>> > >> buffer for variable-length data, some row entities will
> be
> > > > > >>>> unusable.
> > > > > >>>> > >> (Additional "rows" remain but when populating string
> fields
> > > there
> > > > > >>>> is no
> > > > > >>>> > >> longer string storage space to point them to.)
> > > > > >>>> > >> >
> > > > > >>>> > >> > As with many optimization space/time tradeoff
> problems, the
> > > > > >>>> solution
> > > > > >>>> > >> seems to be to guess.  Pre-allocation sets aside variable
> > > length
> > > > > >>>> buffer
> > > > > >>>> > >> storage based on the typical "expected size" of the
> variable
> > > > > >>>> length data.
> > > > > >>>> > >> This can result in some unused rows, as discussed above.
> > > [Fig 4]
> > > > > >>>> In fact
> > > > > >>>> > >> it will necessarily result in one unused row unless the
> last
> > > of
> > > > > >>>> each
> > > > > >>>> > >> variable length field in the last row exactly fits into
> the
> > > > > >>>> remaining space
> > > > > >>>> > >> in the variable length data buffer.  Consider the case
> where
> > > > > there
> > > > > >>>> is more
> > > > > >>>> > >> variable length buffer space than data:
> > > > > >>>> > >> >
> > > > > >>>> > >> > Given variable-length field x, last row index of y,
> > > variable
> > > > > >>>> length
> > > > > >>>> > >> buffer v, beginning offset into v of o:
> > > > > >>>> > >> >     x[y] begins at o
> > > > > >>>> > >> >     x[y] ends at the offset of the next record, there
> is no
> > > > > next
> > > > > >>>> > >> record, so x[y] ends after the total remaining area in
> > > variable
> > > > > >>>> length
> > > > > >>>> > >> buffer... however, this is too much!
> > > > > >>>> > >> >
> > > > > >>>> > >>
> > > > > >>>> > >> It isn't clear to me what you're proposing. It sounds
> like
> > > you
> > > > > >>>> want a
> > > > > >>>> > >> major redesign of the columnar format to permit in-place
> > > mutation
> > > > > >>>> of
> > > > > >>>> > >> strings. I doubt that would be possible at this point.
> > > > > >>>> > >>
> > > > > >>>> > >> > Proposal 2: [low priority] Create an "expected length"
> > > > > statistic
> > > > > >>>> in the
> > > > > >>>> > >> Schema for variable length fields?
> > > > > >>>> > >> >
> > > > > >>>> > >> > Proposal 3: [low priority] Create metadata to store the
> > > index
> > > > > >>>> into
> > > > > >>>> > >> variable-length data that represents the end of the value
> > > for the
> > > > > >>>> last
> > > > > >>>> > >> record?  Alternatively: a row is "wasted," however
> > > pre-allocation
> > > > > >>>> is
> > > > > >>>> > >> inexact to begin with.
> > > > > >>>> > >> >
> > > > > >>>> > >> > Proposal 4: Add metadata to indicate to a RecordBatch
> > > reader
> > > > > >>>> that only
> > > > > >>>> > >> some of the rows are to be utilized.  [Fig 5]  This is
> > > useful not
> > > > > >>>> only when
> > > > > >>>> > >> processing a batch that is still under construction, but
> > > also for
> > > > > >>>> "closed"
> > > > > >>>> > >> batches that were not able to be fully populated due to
> an
> > > > > >>>> imperfect
> > > > > >>>> > >> projection of variable length storage.
> > > > > >>>> > >> >
> > > > > >>>> > >> > On this last proposal, Wes has weighed in:
> > > > > >>>> > >> >
> > > > > >>>> > >> > "I believe your use case can be addressed by
> pre-allocating
> > > > > >>>> record
> > > > > >>>> > >> batches and maintaining application level metadata about
> what
> > > > > >>>> portion of
> > > > > >>>> > >> the record batches has been 'filled' (so the unfilled
> > > records can
> > > > > >>>> be
> > > > > >>>> > >> dropped by slicing). I don't think any change to the
> binary
> > > > > >>>> protocol is
> > > > > >>>> > >> warranted." [3]
> > > > > >>>> > >> >
> > > > > >>>> > >>
> > > > > >>>> > >> My personal opinion is that a solution to the problem you
> > > have
> > > > > can
> > > > > >>>> be
> > > > > >>>> > >> composed from the components (combined with some new
> pieces
> > > of
> > > > > >>>> code)
> > > > > >>>> > >> that we have developed in the project already.
> > > > > >>>> > >>
> > > > > >>>> > >> So the "application level" could be an add-on C++
> component
> > > in
> > > > > the
> > > > > >>>> > >> Apache Arrow project. Call it a "memory-mapped streaming
> data
> > > > > >>>> > >> collector" that pre-allocates on-disk record batches (of
> only
> > > > > >>>> > >> fixed-size or even possibly dictionary-encoded types) and
> > > then
> > > > > >>>> fills
> > > > > >>>> > >> them incrementally as bits of data come in, updating some
> > > > > auxiliary
> > > > > >>>> > >> metadata that other processes can use to determine what
> > > portion
> > > > > of
> > > > > >>>> the
> > > > > >>>> > >> Arrow IPC messages to "slice off".
> > > > > >>>> > >>
> > > > > >>>> > >> > Concerns with positioning this at the app level:
> > > > > >>>> > >> >
> > > > > >>>> > >> > 1- Do we need to address or begin to address the
> overall
> > > > > concern
> > > > > >>>> of how
> > > > > >>>> > >> Arrow data structures are to be used in "true"
> > > (non-microbatch)
> > > > > >>>> streaming
> > > > > >>>> > >> environments, cf [1] in the last paragraph, as a
> > > *first-class*
> > > > > >>>> usage
> > > > > >>>> > >> pattern?  If so, is now the time?
> > > > > >>>> > >> >if you break that design invariant you don't have a
> columnar
> > > > > >>>> format
> > > > > >>>> > >> anymore.
> > > > > >>>> > >>
> > > > > >>>> > >> Arrow provides a binary protocol for describing a payload
> > > data on
> > > > > >>>> the
> > > > > >>>> > >> wire (or on-disk, or in-memory, all the same). I don't
> see
> > > how it
> > > > > >>>> is
> > > > > >>>> > >> in conflict with streaming environments, unless the
> streaming
> > > > > >>>> > >> application has difficulty collecting multiple records
> into
> > > an
> > > > > >>>> Arrow
> > > > > >>>> > >> record batches. In that case, it's a system trade-off.
> > > Currently
> > > > > >>>> > >> people are using Avro with Kafka and sending one record
> at a
> > > > > time,
> > > > > >>>> but
> > > > > >>>> > >> then they're also spending a lot of CPU cycles in
> > > serialization.
> > > > > >>>> > >>
> > > > > >>>> > >> > 2- If we can even make broad-stroke attempts at data
> > > structure
> > > > > >>>> features
> > > > > >>>> > >> that are likely to be useful when streaming becomes a
> first
> > > class
> > > > > >>>> citizen,
> > > > > >>>> > >> it reduces the chances of "breaking" format changes in
> the
> > > > > >>>> future.  I do
> > > > > >>>> > >> not believe the proposals place an undue hardship on
> batch
> > > > > >>>> processing
> > > > > >>>> > >> paradigms.  We are currently discussing making a breaking
> > > change
> > > > > >>>> to the IPC
> > > > > >>>> > >> format [4], so there is a window of opportunity to
> consider
> > > > > >>>> features useful
> > > > > >>>> > >> for streaming?  (Current clients can feel free to ignore
> the
> > > > > >>>> proposed
> > > > > >>>> > >> "utilized" metadata of RecordBatch.)
> > > > > >>>> > >> >
> > > > > >>>> > >>
> > > > > >>>> > >> I think the perception that streaming is not a first
> class
> > > > > citizen
> > > > > >>>> is
> > > > > >>>> > >> an editorialization (e.g. the article you cited was an
> > > editorial
> > > > > >>>> > >> written by an industry analyst based on an interview with
> > > Jacques
> > > > > >>>> and
> > > > > >>>> > >> me). Columnar data formats in general are designed to
> work
> > > with
> > > > > >>>> more
> > > > > >>>> > >> than one value at a time (which we are calling a "batch"
> but
> > > I
> > > > > >>>> think
> > > > > >>>> > >> that's conflating terminology with the "batch processing"
> > > > > paradigm
> > > > > >>>> of
> > > > > >>>> > >> Hadoop, etc.),
> > > > > >>>> > >>
> > > > > >>>> > >> > 3- Part of the promise of Arrow is that applications
> are
> > > not a
> > > > > >>>> world
> > > > > >>>> > >> unto themselves, but interoperate with other
> Arrow-compliant
> > > > > >>>> systems.  In
> > > > > >>>> > >> my case, I would like users to be able to examine
> > > RecordBatchs in
> > > > > >>>> tools
> > > > > >>>> > >> such as pyarrow without needing to be aware of any
> streaming
> > > > > >>>> app-specific
> > > > > >>>> > >> metadata.  For example, a researcher may pull in an IPC
> > > "File"
> > > > > >>>> containing N
> > > > > >>>> > >> RecordBatch messages corresponding to those in Fig 4.  I
> > > would
> > > > > >>>> very much
> > > > > >>>> > >> like for this casual user to not have to apply N slice
> > > operations
> > > > > >>>> based on
> > > > > >>>> > >> out-of-band data to get to the data that is relevant.
> > > > > >>>> > >> >
> > > > > >>>> > >>
> > > > > >>>> > >> Per above, should this become a standard enough use
> case, I
> > > think
> > > > > >>>> that
> > > > > >>>> > >> code can be developed in the Apache project to address
> it.
> > > > > >>>> > >>
> > > > > >>>> > >> > Devil's advocate:
> > > > > >>>> > >> >
> > > > > >>>> > >> > 1- Concurrent access to a mutable (growing) RecordBatch
> > > will
> > > > > >>>> require
> > > > > >>>> > >> synchronization of some sort to get consistent metadata
> > > reads.
> > > > > >>>> Since the
> > > > > >>>> > >> above proposals do not specify how this synchronization
> will
> > > > > occur
> > > > > >>>> for
> > > > > >>>> > >> tools such as pyarrow (we can imagine a Python user
> getting
> > > > > >>>> synchronized
> > > > > >>>> > >> access to File metadata and mapping a read-only area
> before
> > > the
> > > > > >>>> writer is
> > > > > >>>> > >> allowed to continue "appending" to this batch, or
> batches to
> > > this
> > > > > >>>> File),
> > > > > >>>> > >> some "unusual" code will be required anyway, so what is
> the
> > > harm
> > > > > of
> > > > > >>>> > >> consulting side-band data for slicing all the batches as
> > > part of
> > > > > >>>> this
> > > > > >>>> > >> "unusual" code?  [Potential response: Yes, but it is
> still
> > > one
> > > > > >>>> less thing
> > > > > >>>> > >> to worry about, and perhaps first-class support for
> common
> > > > > >>>> synchronization
> > > > > >>>> > >> patterns can be forthcoming?  These patterns may not
> require
> > > > > >>>> further format
> > > > > >>>> > >> changes?]
> > > > > >>>> > >> >
> > > > > >>>> > >> > My overall concern is that I see a lot of wasted effort
> > > dealing
> > > > > >>>> with
> > > > > >>>> > >> the "impedance mismatch" between batch oriented and
> streaming
> > > > > >>>> systems.  I
> > > > > >>>> > >> believe that "best practices" will begin (and continue!)
> to
> > > > > prefer
> > > > > >>>> tools
> > > > > >>>> > >> that help bridge the gap.  Certainly this is the case in
> my
> > > own
> > > > > >>>> work.  I
> > > > > >>>> > >> agree with the appraisal at the end of the ZDNet article.
> > > If the
> > > > > >>>> above is
> > > > > >>>> > >> not a helpful solution, what other steps can be made?
> Or if
> > > > > Arrow
> > > > > >>>> is
> > > > > >>>> > >> intentionally confined to batch processing for the
> > > foreseeable
> > > > > >>>> future (in
> > > > > >>>> > >> terms of first-class support), I'm interested in the
> > > rationale.
> > > > > >>>> Perhaps
> > > > > >>>> > >> the feeling is that we avoid scope creep now (which I
> > > understand
> > > > > >>>> can be
> > > > > >>>> > >> never-ending) even if it means a certain breaking change
> in
> > > the
> > > > > >>>> future?
> > > > > >>>> > >> >
> > > > > >>>> > >>
> > > > > >>>> > >> There's some semantic issues with what "streaming" and
> > > "batch"
> > > > > >>>> means.
> > > > > >>>> > >> When people see "streaming" nowadays they think "Kafka"
> (or
> > > > > >>>> > >> Kafka-like). Single events flow in and out of streaming
> > > > > computation
> > > > > >>>> > >> nodes (e.g. like
> https://apache.github.io/incubator-heron/
> > > or
> > > > > >>>> others).
> > > > > >>>> > >> The "streaming" is more about computational semantics
> than
> > > data
> > > > > >>>> > >> representation.
> > > > > >>>> > >>
> > > > > >>>> > >> The Arrow columnar format fundamentally deals with
> multiple
> > > > > >>>> records at
> > > > > >>>> > >> a time (you can have a record batch with size 1, but
> that is
> > > not
> > > > > >>>> going
> > > > > >>>> > >> to be efficient). But I do not think Arrow is
> "intentially
> > > > > >>>> confined"
> > > > > >>>> > >> to batch processing. If it makes sense to use a columnar
> > > format
> > > > > to
> > > > > >>>> > >> represent data in a streaming application, then you can
> > > certainly
> > > > > >>>> use
> > > > > >>>> > >> it for that. I'm aware of people successfully using Arrow
> > > with
> > > > > >>>> Kafka,
> > > > > >>>> > >> for example.
> > > > > >>>> > >>
> > > > > >>>> > >> - Wes
> > > > > >>>> > >>
> > > > > >>>> > >> > Who else encounters the need to mix/match batch and
> > > streaming,
> > > > > >>>> and what
> > > > > >>>> > >> are your experiences?
> > > > > >>>> > >> >
> > > > > >>>> > >> > Thanks for the further consideration and discussion!
> > > > > >>>> > >> >
> > > > > >>>> > >> > [1] https://zd.net/2H0LlBY
> > > > > >>>> > >> > [2] https://arrow.apache.org/docs/python/data.html
> > > > > >>>> > >> > [3] https://bit.ly/2J5sENZ
> > > > > >>>> > >> > [4] https://bit.ly/2Yske8L
> > > > > >>>> > >>
> > > > > >>>> > >
> > > > > >>>>
> > > > > >>>
> > > > >
> > >
>
>

Reply via email to