LantaoJin opened a new pull request, #98:
URL: https://github.com/apache/datafusion-java/pull/98
## Which issue does this PR close?
- Closes #95 .
## Rationale for this change
PR **#65** shipped a Java-implemented `TableProvider` and
`SessionContext.registerTable(String, TableProvider)`. That covered the
**pull** shape: DataFusion calls `scan(BufferAllocator)` and reads the returned
`ArrowReader`. For Java callers who already have batches in memory, or who can
produce them on demand, that's the right surface.
It does not cover the **push** shape that event-driven batch sources need:
- A coordinator reducing over shard responses arriving incrementally.
- A Flight stream feeding into a query.
- Any in-process producer that emits batches as side-effects of other work
and doesn't know in advance how many will arrive.
Bridging these into PR #65 today requires writing a `BlockingArrowReader`
adapter that buffers pushed batches and serves them through the pull interface.
That's a serialisation point: the producer blocks waiting for
`loadNextBatch()`, or DataFusion blocks waiting for the next batch -- the two
ends can never run truly concurrently. The adapter also has to invent its own
backpressure, error propagation, end-of-stream signalling, and thread-safety
story.
DataFusion itself solves this on the Rust side with `StreamingTable` +
`PartitionStream` plus an mpsc channel. This PR surfaces that exact shape from
Java.
## What changes are included in this PR?
New public Java API on `SessionContext` returning a typed sink:
```java
TableSink sink = ctx.registerStreamingTable("shard_results", schema, /*
capacity */ 16);
// Producer thread (any thread, including outside any Tokio runtime, and
including a thread
// that is itself a Tokio worker -- see "Tokio context detection" below):
try {
while (hasMoreInput()) {
sink.write(batch); // backpressures when channel is full
}
sink.close(); // EOF: queries see end-of-stream cleanly
} catch (Throwable t) {
sink.fail(t); // signal error: queries see RuntimeException
}
```
```java
public final class TableSink implements AutoCloseable {
void write(VectorSchemaRoot batch);
@Override void close();
void fail(Throwable cause);
}
```
After registration the table can be queried like any other:
```java
DataFrame df = ctx.sql("SELECT count(*) FROM shard_results");
ArrowReader r = df.collect(allocator);
```
**Single-scan semantics.** The registered table can be queried at most once;
subsequent scans against the same registration throw a `RuntimeException`. This
matches the natural semantic for an event-driven producer (the data is consumed
as it arrives) and avoids buffering every batch internally. Callers who need
re-scan should use the pull-mode `registerTable` + `SimpleTableProvider` path
from PR #65 instead. Documented loudly on `registerStreamingTable`'s Javadoc
and tested explicitly (`secondScanThrows`).
**Schema constraints (validated at registration).** The `schema` passed to
`registerStreamingTable` must:
- Have at least one column. Zero-column streaming would have no allocator to
borrow for the FFI scratch; rejected up front rather than failing at first
write.
- Have no dictionary-encoded fields *at any depth* (recursive walk through
`Field.getChildren()`). `Data.exportVectorSchemaRoot` would otherwise NPE on
the missing `DictionaryProvider` -- v1 cannot supply one. The error message
names the dotted path (e.g. `'row.code'`) so nested violations are debuggable.
A future overload that accepts a `DictionaryProvider` would lift this
restriction; out of scope here.
Native side, in a new `native/src/streaming_table.rs` module:
- `JavaPartitionStream` -- impl of upstream `PartitionStream` trait. Holds
`Mutex<ReceiverState>` (the `Available(Receiver) | Taken` enum is what enforces
single-scan: the first `execute()` call swaps to `Taken`; subsequent calls
return an error stream). The receiver is **owned exclusively** by the partition
stream -- if the registered table drops (e.g. the `SessionContext` closes
before the `TableSink`), the receiver drops with it and any outstanding
producer-side `Sender::send` returns `Err(SendError)` rather than parking on a
dangling channel.
- `TableSinkHandle` -- the opaque struct backing the Java `TableSink`'s
native handle. Holds `Mutex<Option<mpsc::Sender>>` (`close()` / `fail()` move
the sender out and drop it), a sideband `terminal_error:
Mutex<Option<DataFusionError>>` populated by `fail()`, a durable `closed_flag:
AtomicBool` and a `closed_notify: Notify` for waking parked writers. The notify
alone is not sufficient -- `notify_waiters()` only delivers to
already-registered waiters -- so writers register their `Notified` future
*before* re-checking `closed_flag` with `Acquire` ordering, defeating the
lost-wakeup race.
- `make_streaming_table(schema, capacity)` -- constructs the
`(StreamingTable, TableSinkHandle)` pair sharing a
`tokio::sync::mpsc::channel(capacity)`. The receiver is wrapped via
`tokio_stream::wrappers::ReceiverStream` plus a chained `unfold` tail that
consults the sideband `terminal_error` slot at end-of-stream and emits the
producer's terminal error if any. This avoids `fail()`'s alternative path of
`tx.blocking_send(Err(...))`, which would deadlock on a full channel pre-query.
- `import_batch_from_ffi(array_addr, schema_addr)` -- decodes a
Java-exported `(FFI_ArrowArray, FFI_ArrowSchema)` pair into a `RecordBatch`.
After import, `TableSinkHandle::write` re-attaches the registered `SchemaRef`
to the imported batch via `RecordBatch::try_new(self.schema.clone(),
batch.columns().to_vec())` -- otherwise top-level Schema metadata would be lost
(because `RecordBatch::from(StructArray)` rebuilds the schema as
`Schema::new(fields)`).
JNI handlers in `native/src/lib.rs`:
- `Java_..._SessionContext_registerStreamingTableNative(handle, name,
schemaIpc, capacity) -> jlong` -- registers the table and returns a
`Box<Arc<TableSinkHandle>>` pointer. The `Arc` wrapper is what makes the next
bullet safe.
- `Java_..._TableSink_writeBatchNative(handle, arrayAddr, schemaAddr)` --
imports the batch and sends through the channel. Each call clones the inner
`Arc<TableSinkHandle>` *before* doing any work, so a concurrent
`dropHandleNative` cannot free the underlying handle out from under an
in-flight write. The select! arm uses `Handle::try_current()` to detect whether
the calling thread is already inside a Tokio worker (e.g. invoked from a
`TableProvider.scan` or UDF callback dispatched by DataFusion's executor); on a
worker we use `block_in_place + Handle::block_on`, otherwise we use the shared
`crate::runtime().block_on`. Without that detection, `Runtime::block_on` panics
across the FFI boundary with "Cannot start a runtime from within a runtime".
- `Java_..._TableSink_closeSinkNative(handle)` / `failSinkNative(handle,
message)` -- close drops the sender + sets `closed_flag` + notifies; fail
records the terminal error first, then close-equivalent. Both signal close
*before* the lifecycle write lock is acquired, so any write parked on
backpressure wakes and releases its read lock -- otherwise `dropHandleNative`'s
wait for the write lock would deadlock against the parked write.
- `Java_..._TableSink_dropHandleNative(handle)` -- frees the
`Box<Arc<TableSinkHandle>>`. Other in-flight calls hold their own `Arc` clones,
so the inner handle isn't freed until all of them return.
The Java `TableSink` mirrors this with an `AtomicLong nativeHandle` and a
`ReentrantReadWriteLock` over the lifetime; `write()` holds the read lock
during the JNI call, `close()`/`fail()` flip the handle to 0 (forbidding new
writes), call `closeSinkNative`/`failSinkNative` (waking parked writes), then
take the write lock and call `dropHandleNative` (which now waits for all
readers to drain).
The `TableSink.write` Java path derives the FFI scratch allocator from the
producer's batch (`batch.getFieldVectors().get(0).getAllocator()`) so the
exported buffers share an allocator-root with the producer's vectors. Using a
separate `RootAllocator` would make `Data.exportVectorSchemaRoot` reject the
cross-root transfer.
Cancellation propagates automatically: when DataFusion drops the consumer
(query cancelled, `LIMIT N` short-circuits, error in another operator), the
receiver is dropped, `Sender::send` resolves to `Err(SendError)`, and the JNI
handler surfaces that as a `RuntimeException` on the producer thread. Producers
that ignore the exception leak no resources because the sender's `Drop` runs
unconditionally.
`tokio-stream` is added as a direct Cargo dependency (it was already pulled
in transitively by `datafusion-physical-plan`; declaring it directly insulates
us from transitive resolution drift). The `tokio` dependency gains the `sync`
feature for `Notify`.
Out of scope (for follow-ups):
- **Multi-partition `registerStreamingTable`.** `StreamingTable::try_new`
accepts `Vec<Arc<dyn PartitionStream>>`; an overload accepting multiple sinks
is a small additive follow-up.
- **`try_write(batch) -> boolean`** -- non-blocking variant. Easy to add as
a method on `TableSink`.
- **`writeAsync` returning `CompletableFuture`** -- pairs with the broader
async API surface tracked as #9.
- **Re-scannable streaming.** Would force buffering every batch; defeats the
use case. Use pull-mode `registerTable` instead.
- **Dictionary-encoded fields.** Would need a `DictionaryProvider` overload
on `registerStreamingTable`. Tracked as a future enhancement.
## Are these changes tested?
Yes -- 18 new tests in `SessionContextStreamingTableTest`.
## Are there any user-facing changes?
Yes -- purely additive. New public API:
- `org.apache.datafusion.TableSink` (final, `AutoCloseable`)
- `SessionContext.registerStreamingTable(String, Schema, int) -> TableSink`
No API removals, no deprecations, no behavior change for existing callers.
The native binary picks up `tokio-stream` as a direct dep but it was already on
the classpath transitively, so the binary size delta is zero. The `tokio`
dependency gains the `sync` feature, which is small (Notify, mpsc).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]