Hi Raghu, Hudi has a property named "hoodie.bloom.index.update.partition.path". You might want to try setting this to true if you need the behavior you are expecting. Here is the docs docs for this config. Default value is false for this config param.
/** * Only applies if index type is GLOBAL_BLOOM. * <p> * When set to true, an update to a record with a different partition from its existing one * will insert the record to the new partition and delete it from the old partition. * <p> * When set to false, a record will be updated to the old partition. */ Let me know if this makes sense and is what you are expecting. On Sun, May 24, 2020 at 10:14 AM Dubey, Raghu <[email protected]> wrote: > Hi Team, > > I used DeltaStreamer to ingest data and performed a test where the > partition column changes. When the partition column in my dataset got > updated, my hive query on Hudi dataset returned 2 rows for the same > recordKey. This was expected and I got the explanation in this issue. > https://github.com/apache/incubator-hudi/issues/423 > > However, as per the link : > https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoestheHudiindexingwork&whatareitsbenefits, > I tried setting hoodie.index.type=GLOBAL_BLOOM to circumvent this issue. > Once I do this, I do not see 2 records anymore in the Hive query. > > But I was expecting that it should move a file from one partition to > another and update the partition column in Hudi (in my case store_id). I > only have 2 store IDs (1 and 2). But if you see the results of the queries > before and after the update statement, it seems to have just stopped > updating the partition path altogether. > > Am I doing something wrong here? > I want to have a global uniqueness of data on recordKey irrespective of > partition path. > Please help. > > Below is the updated incremental source data that was used (store_id is > updated to 1 instead of 2). > { > "Op": "U", > "src_received_ts": "2020-05-22 15:03:51.000000", > "tran_id": 18, > "tran_date": "2019-03-16", > "store_id": 1, > "store_city": "CHICAGO", > "store_state": "IL", > "item_code": "XXXXXX", > "quantity": 126 > } > > Before:########## > > hive> select * from XXXXX where tran_id=18; > OK > _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key > _hoodie_partition_path _hoodie_file_name op src_received_ts > tran_id tran_date store_city store_state item_code quantity > total store_id > 20200519123838 20200519123838_1_1 18 2 > 4b683050-4151-4d76-a22a-355f73f61b15-0_1-23-21_20200519123838.parquet I > 2020-05-19 12:26:03.000000 18 2019-03-16 CHICAGO IL > XXXXXX 126 22.0 2 > > After:########## > > select * from XXXXX where tran_id=18; > OK > _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key > _hoodie_partition_path _hoodie_file_name op src_received_ts > tran_id tran_date store_city store_state item_code quantity > total store_id > 20200522150752 20200522150752_0_1 18 2 > 4b683050-4151-4d76-a22a-355f73f61b15-0_0-23-20_20200522150752.parquet U > 2020-05-22 15:03:51.000000 18 2019-03-16 CHICAGO IL > XXXXXX 126 1.19E-43 2 > Time taken: 0.952 seconds, Fetched: 1 row(s) > > Hudi Command: > > spark-submit --class > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ > --packages > org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 > \ > --master yarn \ > --deploy-mode cluster \ > /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \ > --table-type COPY_ON_WRITE \ > --source-ordering-field src_received_ts \ > --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ > --target-base-path s3://xxxx/xxxxxxx \ > --target-table xxxxxxx \ > --props s3://xxxx/xxx/hudi_base.properties \ > --schemaprovider-class > org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ > --transformer-class > org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ > --enable-hive-sync > > Here is the hudi_base.properties file: > > hoodie.cleaner.commits.retained=2 > hoodie.upsert.shuffle.parallelism=1 > hoodie.insert.shuffle.parallelism=1 > hoodie.bulkinsert.shuffle.parallelism=1 > hoodie.datasource.write.recordkey.field=tran_id > hoodie.datasource.hive_sync.partition_fields=store_id > hoodie.datasource.write.partitionpath.field=store_id > hoodie.datasource.hive_sync.database=default > hoodie.datasource.hive_sync.table=XXXX > hoodie.datasource.hive_sync.assume_date_partitioning=false > > hoodie.deltastreamer.schemaprovider.target.schema.file=s3://XXXX/XXXX/trgt.avsc > > hoodie.deltastreamer.schemaprovider.source.schema.file=s3://XXXX/XXX/src.avsc > > hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor > hoodie.deltastreamer.transformer.sql=select > Op,src_received_ts,tran_id,tran_date,store_id,store_city,store_state,item_code,quantity > from <SRC> > hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://XXX:XXXX > hoodie.deltastreamer.source.dfs.root=s3://XXXX/ XXX > hoodie.index.type=GLOBAL_BLOOM > > > Thanks, > Raghu > > -- Regards, -Sivabalan
