Hi Gautam,
What I understood was you are trying to incrementally ingest from
RowBasedSource. It is not clear to me if this upstream source is another
HoodieIncrSource. If that is the case, not sure how the second batch will miss
the columns. Can you elaborate more on the setup and what your upstream source
is ?
Anyways, It is ok for incremental dataset (second batch to be ingested) to have
fewer columns than those (in the first batch) as long as the missing columns
are nullable (Avro backwards compatible). But per contract, Hudi needs the
latest schema (union schema) for every ingestion run. If you had passed the
schema (with columns missing), then its possible to lose the columns. Hudi COW
reads the older version of the file and creates newer version using the schema
passed. So, if the schema passed has missing columns, both the old record and
new records which were in the same file will be missing the column.
IIUC, you would need to provide a schema-provider in HoodieDeltaStreamer
execution (--schema-provider-class) where the schema returned is the
union-schema.
Let me know if this makes sense. Also please elaborate on your pipeline setup.
Thanks,Balaji.V
On Friday, September 13, 2019, 02:33:16 PM PDT, Gautam Nayak
<[email protected]> wrote:
Hi,
We have been evaluating Hudi and there is one use case we are trying to solve,
where incremental datasets can have fewer columns than the ones that have been
already persisted in Hudi format.
For example : In initial batch , We have a total of 4 columns
val initial = Seq(("id1", "col1", "col2", 123456)).toDF("pk", "col1",
"col2", "ts")
and in the incremental batch, We have 3 columns
val incremental = Seq(("id2", "col1", 123879)).toDF("id", "col1", "ts")
We want to have a union of initial and incremental schemas such that col2 of
id2 has some default type associated to it. But what we are seeing is the
latest schema(incremental) for both the records when we persist the data (COW)
and read it back through Spark. The actual incrementals datasets would be in
Avro format but we do not maintain their schemas.
I tried looking through the documentation to see if there is a specific
configuration to achieve this, but couldn’t find any.
We would also want to achieve this via Deltastreamer and then query these
results from Presto.
Thanks,
Gautam