Hi Gautam,
Independent of using Hudi, it is best practice to manage schema of your 
organization's datasets using some central mechanism like schema registry. 
Without this, it is pretty difficult to evolve schema. It is schema-registry's 
responsibility for providing the correct schema for your incoming batch. 
As you have noted, DeltaStreamer comes with integrations for 
FileBasedSchemaProvider and Confluent's schema registry. It is pretty easy to 
add integration with any other schema provider. 
IIUC, the loading from parquet is one-time (bootstrap). please take a look at 
https://hudi.apache.org/migration_guide.html#option-1Regarding csv incremental 
upsert, we have an active HIP 
(https://cwiki.apache.org/confluence/display/HUDI/HIP-1) for supporting csv 
sources in DeltaStreamer. So, if you want to use DeltaStreamer as-is, you can 
do a simple conversion of csv to json and delta-streamer would be able to 
ingest them now.
Balaji.V






    On Saturday, September 14, 2019, 12:38:03 PM PDT, Gautam Nayak 
<gna...@guardanthealth.com> wrote:  
 
 Thanks Balaji for the detailed information.One of our pipeline sources data 
from databases [1] (incremental Sqoop) as parquet files and the other pipeline 
sources system generated incremental CSV’s [2]  , both of which have to be 
persisted and read as Hive/Presto tables.In both these cases, We are seeing 
columns getting removed over time.We want our warehouse tables to keep track of 
all the columns from the day we started ingesting, Which we are currently doing 
using bulk merge (Spark) and custom schema evolution which is not appropriate 
for large datasets considering the full data scan it has to go through.

For [1], We are looking to use HoodieDeltaStreamer but since the data is in 
parquet, we are not sure if it's supported.
For [2], We are unsure of using HoodieDeltaStreamer, So we also want to have an 
option of using Datasource writer.

As you have mentioned that for HoodieDeltaStreamer, We need to provide a custom 
schema-provider class which will union the schema, but I am not sure if this 
will be an adhoc process which will first read the incremental data , infer the 
schema and then union with existing schema ? Because the implementations that I 
see in hudi-utilities are related to reading schema from File, Row, 
SchemaRegistry and nothing related to unioning schema. Does hoodie provide this 
functionality ?
Thanks Gautam


> On Sep 14, 2019, at 12:34 AM, vbal...@apache.org wrote:
> 
> 
> 
> [External Email]
> 
> 
> Hi Gautam,
> What I understood was you are trying to incrementally ingest from 
> RowBasedSource. It is not clear to me if this upstream source is another 
> HoodieIncrSource. If that is the case, not sure how the second batch will 
> miss the columns. Can you elaborate more on the setup and what your upstream 
> source is ?
> Anyways, It is ok for incremental dataset (second batch to be ingested) to 
> have fewer columns than those (in the first batch) as long as the missing 
> columns are nullable (Avro backwards compatible).  But per contract, Hudi 
> needs the latest schema (union schema) for every ingestion run. If you had 
> passed the schema (with columns missing), then its possible to lose the 
> columns. Hudi COW reads the older version of the file and creates newer 
> version using the schema passed. So, if the schema passed has missing 
> columns, both the old record and new records which were in the same file will 
> be missing the column.
>  IIUC, you would need to provide a schema-provider in HoodieDeltaStreamer 
>execution (--schema-provider-class) where the schema returned is the 
>union-schema.
> Let me know if this makes sense. Also please elaborate on your pipeline setup.
> Thanks,Balaji.V
> 
>    On Friday, September 13, 2019, 02:33:16 PM PDT, Gautam Nayak 
><gna...@guardanthealth.com> wrote:  
> 
> Hi,
> We have been evaluating Hudi and there is one use case we are trying to 
> solve, where incremental datasets can have fewer columns than the ones that 
> have been already persisted in Hudi format.
> 
> For example : In initial batch , We have a total of 4 columns
>    val initial = Seq(("id1", "col1", "col2", 123456)).toDF("pk", "col1", 
>"col2", "ts")
> 
> and in the incremental batch, We have 3 columns
> val incremental = Seq(("id2", "col1", 123879)).toDF("id", "col1", "ts")
> 
> We want to have a union of initial and incremental schemas such that col2 of 
> id2 has some default type associated to it. But what we are seeing is the 
> latest schema(incremental) for both the records when we persist the data 
> (COW) and read it back through Spark. The actual incrementals datasets would 
> be in Avro format but we do not maintain their schemas.
> I tried looking through the documentation to see if there is a specific 
> configuration to achieve this, but couldn’t find any.
> We would also want to achieve this via Deltastreamer and then query these 
> results from Presto.
> 
> Thanks,
> Gautam
> 
> 
> 
> 
> 

  

Reply via email to