During my time building financial analytics and trading systems (23
years!), both the "batch processing" and "stream processing" paradigms have
been extensively used by myself and by colleagues.

Unfortunately, the tools used in these paradigms have not successfully
overlapped.  For example, an analyst might use a Python notebook with
pandas to do some batch analysis.  Then, for acceptable latency and
throughput, a C++ programmer must implement the same schemas and processing
logic in order to analyze real-time data for real-time decision support.
 (Time horizons often being sub-second or even sub-millisecond for an
acceptable reaction to an event.  The most aggressive software-based
systems, leaving custom hardware aside other than things like kernel-bypass
NICs, target 10s of microseconds for a full round trip from data ingestion
to decision.)

As a result, TCO is more than doubled.  A doubling can be accounted for by
two implementations that share little or nothing in the way of
architecture.  Then additional effort is required to ensure that these
implementations continue to behave the same way and are upgraded in
lock-step.

Arrow purports to be a "bridge" technology that eases one of the pain
points of working in different ecosystems by providing a common event
stream data structure.  (Discussion of common processing techniques is
beyond the scope of this discussion.  Suffice it to say that a streaming
algo can always be run in batch, but not vice versa.)

Arrow seems to be growing up primarily in the batch processing world.  One
publication notes that "the missing piece is streaming, where the velocity
of incoming data poses a special challenge. There are some early
experiments to populate Arrow nodes in microbatches..." [1]  Part our our
discussion could be a response to this observation.  In what ways is it
true or false?  What are the plans to remedy this shortcoming, if it
exists?  What steps can be taken now to ease the transition to low-latency
streaming support in the future?

In my own experience, a successful strategy for stream processing where
context (i.e. recent past events) must be considered by calculations is to
pre-allocate memory for event collection, to organize this memory in a
columnar layout, and to run incremental calculations at each event ingress
into the partially populated memory.  [Fig 1]  When the pre-allocated
memory has been exhausted, allocate a new batch of column-wise memory and
continue.  When a batch is no longer pertinent to the calculation look-back
window, free the memory back to the heap or pool.

Here we run into the first philosophical barrier with Arrow, where "Arrow
data is immutable." [2]  There is currently little or no consideration for
reading a partially constructed RecordBatch, e.g. one with only some of the
rows containing event data at the present moment in time.

Proposal 1: Shift the Arrow "immutability" doctrine to apply to populated
records of a RecordBatch instead of to all records?

As an alternative approach, RecordBatch can be used as a single Record
(batch length of one).  [Fig 2]  In this approach the benefit of the
columnar layout is lost for look-back window processing.

Another alternative approach is to collect an entire RecordBatch before
stepping through it with the stream processing calculation. [Fig 3]  With
this approach some columnar processing benefit can be recovered, however
artificial latency is introduced.  As tolerance for delays in decision
support dwindles, this model will be of increasingly limited value.  It is
already unworkable in many areas of finance.

When considering the Arrow format and variable length values such as
strings, the pre-allocation approach (and subsequent processing of a
partially populated batch) encounters a hiccup.  How do we know the amount
of buffer space to pre-allocate?  If we allocate too much buffer for
variable-length data, some of it will be unused.  If we allocate too little
buffer for variable-length data, some row entities will be unusable.
 (Additional "rows" remain but when populating string fields there is no
longer string storage space to point them to.)

As with many optimization space/time tradeoff problems, the solution seems
to be to guess.  Pre-allocation sets aside variable length buffer storage
based on the typical "expected size" of the variable length data.  This can
result in some unused rows, as discussed above.  [Fig 4]  In fact it will
necessarily result in one unused row unless the last of each variable
length field in the last row exactly fits into the remaining space in the
variable length data buffer.  Consider the case where there is more
variable length buffer space than data:

Given variable-length field x, last row index of y, variable length buffer
v, beginning offset into v of o:
    x[y] begins at o
    x[y] ends at the offset of the next record, there is no next record, so
x[y] ends after the total remaining area in variable length buffer...
however, this is too much!

Proposal 2: [low priority] Create an "expected length" statistic in the
Schema for variable length fields?

Proposal 3: [low priority] Create metadata to store the index into
variable-length data that represents the end of the value for the last
record?  Alternatively: a row is "wasted," however pre-allocation is
inexact to begin with.

Proposal 4: Add metadata to indicate to a RecordBatch reader that only some
of the rows are to be utilized.  [Fig 5]  This is useful not only when
processing a batch that is still under construction, but also for "closed"
batches that were not able to be fully populated due to an imperfect
projection of variable length storage.

On this last proposal, Wes has weighed in:

"I believe your use case can be addressed by pre-allocating record batches
and maintaining application level metadata about what portion of the record
batches has been 'filled' (so the unfilled records can be dropped by
slicing). I don't think any change to the binary protocol is warranted." [3]

Concerns with positioning this at the app level:

1- Do we need to address or begin to address the overall concern of how
Arrow data structures are to be used in "true" (non-microbatch) streaming
environments, cf [1] in the last paragraph, as a *first-class* usage
pattern?  If so, is now the time?

2- If we can even make broad-stroke attempts at data structure features
that are likely to be useful when streaming becomes a first class citizen,
it reduces the chances of "breaking" format changes in the future.  I do
not believe the proposals place an undue hardship on batch processing
paradigms.  We are currently discussing making a breaking change to the IPC
format [4], so there is a window of opportunity to consider features useful
for streaming?  (Current clients can feel free to ignore the proposed
"utilized" metadata of RecordBatch.)

3- Part of the promise of Arrow is that applications are not a world unto
themselves, but interoperate with other Arrow-compliant systems.  In my
case, I would like users to be able to examine RecordBatchs in tools such
as pyarrow without needing to be aware of any streaming app-specific
metadata.  For example, a researcher may pull in an IPC "File" containing N
RecordBatch messages corresponding to those in Fig 4.  I would very much
like for this casual user to not have to apply N slice operations based on
out-of-band data to get to the data that is relevant.

Devil's advocate:

1- Concurrent access to a mutable (growing) RecordBatch will require
synchronization of some sort to get consistent metadata reads.  Since the
above proposals do not specify how this synchronization will occur for
tools such as pyarrow (we can imagine a Python user getting synchronized
access to File metadata and mapping a read-only area before the writer is
allowed to continue "appending" to this batch, or batches to this File),
some "unusual" code will be required anyway, so what is the harm of
consulting side-band data for slicing all the batches as part of this
"unusual" code?  [Potential response: Yes, but it is still one less thing
to worry about, and perhaps first-class support for common synchronization
patterns can be forthcoming?  These patterns may not require further format
changes?]

My overall concern is that I see a lot of wasted effort dealing with the
"impedance mismatch" between batch oriented and streaming systems.  I
believe that "best practices" will begin (and continue!) to prefer tools
that help bridge the gap.  Certainly this is the case in my own work.  I
agree with the appraisal at the end of the ZDNet article.  If the above is
not a helpful solution, what other steps can be made?  Or if Arrow is
intentionally confined to batch processing for the foreseeable future (in
terms of first-class support), I'm interested in the rationale.  Perhaps
the feeling is that we avoid scope creep now (which I understand can be
never-ending) even if it means a certain breaking change in the future?

Who else encounters the need to mix/match batch and streaming, and what are
your experiences?

Thanks for the further consideration and discussion!

[1] https://zd.net/2H0LlBY
[2] https://arrow.apache.org/docs/python/data.html
[3] https://bit.ly/2J5sENZ
[4] https://bit.ly/2Yske8L

Reply via email to