Hi, vinoth,
I created a PR(https://github.com/apache/hudi/pull/8376) for this feature, could you help review it? BR, Kong At 2023-04-05 00:19:20, "Vinoth Chandar" <vin...@apache.org> wrote: >Look forward to this! could really help backfill/rebootstrap scenarios. > >On Tue, Apr 4, 2023 at 9:18 AM Vinoth Chandar <vin...@apache.org> wrote: > >> Thinking out loud. >> >> 1. For insert operations, it should not matter anyway. >> 2. For upsert etc, the preCombine would handle the ordering problems. >> >> Is that what you are saying? I feel we don't want to leak any Kafka >> specific logic or force use of special payloads etc. thoughts? >> >> I assigned the jira to you and also made you a contributor. So in future, >> you can self-assign. >> >> On Mon, Apr 3, 2023 at 7:08 PM 孔维 <18701146...@163.com> wrote: >> >>> Hi, >>> >>> >>> Yea, we can create multiple spark input partitions per Kafka partition. >>> >>> >>> I think the write operations can handle the potentially out-of-order >>> events, because before writing we need to preCombine the incoming events >>> using source-ordering-field and we also need to combineAndGetUpdateValue >>> with records on storage. From a business perspective, we use the combine >>> logic to keep our data correct. And hudi does not require any guarantees >>> about the ordering of kafka events. >>> >>> >>> I already filed one JIRA[https://issues.apache.org/jira/browse/HUDI-6019], >>> could you help assign the JIRA to me? >>> >>> >>> >>> >>> >>> >>> >>> At 2023-04-03 23:27:13, "Vinoth Chandar" <vin...@apache.org> wrote: >>> >Hi, >>> > >>> >Does your implementation read out offset ranges from Kafka partitions? >>> >which means - we can create multiple spark input partitions per Kafka >>> >partitions? >>> >if so, +1 for overall goals here. >>> > >>> >How does this affect ordering? Can you think about how/if Hudi write >>> >operations can handle potentially out-of-order events being read out? >>> >It feels like we can add a JIRA for this anyway. >>> > >>> > >>> > >>> >On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote: >>> > >>> >> Hi team, for the kafka source, when pulling data from kafka, the >>> default >>> >> parallelism is the number of kafka partitions. >>> >> There are cases: >>> >> >>> >> Pulling large amount of data from kafka (eg. maxEvents=100000000), but >>> the >>> >> # of kafka partition is not enough, the procedure of the pulling will >>> cost >>> >> too much of time, even worse cause the executor OOM >>> >> There is huge data skew between kafka partitions, the procedure of the >>> >> pulling will be blocked by the slowest partition >>> >> >>> >> to solve those cases, I want to add a parameter >>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the >>> maxEvents in >>> >> one kafka batch, default Long.MAX_VALUE means not trun this feature on. >>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents this confiuration will >>> >> take effect after the hoodie.deltastreamer.kafka.source.maxEvents >>> config. >>> >> >>> >> >>> >> Here is my POC of the imporvement: >>> >> max executor core is 128. >>> >> not turn the feature on >>> >> (hoodie.deltastreamer.kafka.source.maxEvents=50000000) >>> >> >>> >> >>> >> turn on the feature >>> (hoodie.deltastreamer.kafka.per.batch.maxEvents=200000) >>> >> >>> >> >>> >> after turn on the feature, the timing of Tagging reduce from 4.4 mins >>> to >>> >> 1.1 mins, can be more faster if given more cores. >>> >> >>> >> How do you think? can I file a jira issue for this? >>> >>