Hi Jark,

Thanks for your review comments. I have revised the FIP document
accordingly. I sincerely apologize that progress on this FIP was paused for
quite some time due to other tasks; I will continue to move this work
forward.

Best,
Yang

Jark Wu <[email protected]> 于2025年8月10日周日 13:11写道:

> Hi Yang,
>
> Thanks for driving this excellent work! I left some comments below:
>
> *1. "If we use a filter() like Fluent API, we may lead users to
> misunderstand the real semantics of the interface."*
>
> I don't think this will give users the impression that the server side
> performs row-by-row filtering, because we have already provides similar
> methods like "project()" and "limit()", they all not row-by-row operations.
> We can add necessary Javadocs for the methods to make it clear it performs
> a server-sde recordbatch-level filtering.
>
> Personally, I don't like "createLogScanner(Predicate recordBatchFilter)"
> because it will explode the method when we have more filter types and
> parameters.  The existing builder API pattern has better extensibility.
> Besides, the filter pushdown is record-batch-level today, but maybe
> file-level in the future, and that optimization should be transparent for
> users, and don't require a API change.
>
> *2. The "Predicate.proto" file*
>
> The FIP introduced a dedicated "Predicate.proto" file for the predicate
> expressions, but it breaks the fluss single-file RPC definition. I don't
> see benefits to breaks into mulitple proto files, but make it hard for
> 3rd-parts to understand the Fluss RPC definition. Therefore, I suggest just
> put the predicate definition in "FlussApi.proto"
>
> *3. Suggestions on some Predicate proto definitions*
> (1) no definition of PbLiteralValue, this is a public API, and shouldn't
> omit in FIP. API is the most important part to disucss in FIP, even if it
> will make the FIP large.
> (2) Can we use an int index to represent the PbFieldRef? The server side
> has the schema information of the table, so it can derive the data types
> and column names in server side. We also does this for projection pushdown.
> (3)
>
> *4. Statistics Data format*
>
> The Statistics data format is the most critical part in this FIP and should
> be drawn like the RecordBatch Format. I think we have many topics to
> discuss with the Statistics data format:
> (1) Statistics Data should be before or after Records Data? I think put
> statistics with header together can make filtering record batch faster as
> it can reduce to one IO.
> (2) Can it statistics format support lazily deserialize for sepcific
> columns? Considering 1000 columsn with only filtering one column, the
> deserialization cost maybe larger than direct read without filtering.
> (3) How the statistics format evolve to future statistics?
>
>
> *5. FilteredLogRecordBatchIterator reads each RecordBatch*
>
> You said the FilteredLogRecordBatchIterator will try to read the statistics
> information from each the RecordBatch. I'm afraid the performance
> degression introduced by this. Because fluss reads chunks without
> deserialization each record batch header by default (if no projection
> pushdown). However, if there is a filter pushdown, even if the filter
> doesn't filter anything, and even if the log table doesn't have statistics,
> it fallback to read each batch header with will be much slower than before.
>
> *6. Migration Strategy: Deploy new version with feature flag disabled*
>
> The migration strategy you replied to Yuxia sounds very complex and
> error-prone to me.
>
> I think an easier to for the migration or backward-compatibility is
> introducing a table-level statistic configuration, such as
> "table.log.statistic.enabled". Every table created in the new version
> cluster, the table will be added this configuration with "true" value
> unless users disable it explicilty. Then for the old tables, we still keep
> the old behavior that not generating the statistics as it requires the
> client upgrade. For the new tables enabled the config, it requires an
> upgrade of the client, otherwise, the client or server should throw an
> exception about no statistics.
>
>
> Best,
> Jark
>
>
>
> On Fri, 8 Aug 2025 at 13:46, Yang Wang <[email protected]> wrote:
>
> > 发件人: Yang Wang <[email protected]>
> > Date: 2025年8月8日周五 11:17
> > Subject: Re: [DISCUSS] FIP-10: Support Log RecordBatch Filter Pushdown
> > To: <[email protected]>
> >
> >
> > Hi Cheng,
> >
> > > Can we clarify that this filter evaluation works on a best-effort basis
> > at the beginning of the FIP document? Specifically, it only performs
> > coarse-grained block skipping by leveraging RecordBatch
> statistics.&nbsp;To
> > be honest, the table.newScan().filter(recordBatchFilter) API gave me the
> > impression that the server side performs row-by-row filtering.&nbsp;
> >
> > This question relates to what I want to apologize to @HongShun again for,
> > as my reply to his review yesterday was not well considered. I will
> clarify
> > that the previously designed API:
> >
> > > LogScanner createLogScanner(Predicate recordBatchFilter);
> >
> > It can clearly hint to the user that the filter is responsible for
> > filtering recordBatch only (not at the row level) for log tables. If we
> use
> > a filter() like Fluent API, we may lead users to misunderstand the real
> > semantics of the interface.
> >
> > Best regards,
> > Yang
> >
> >
> > Wang Cheng <[email protected]> 于2025年8月8日周五 08:38写道:
> >
> > > Thanks Yang for driving this work.
> > >
> > >
> > > Can we clarify that this filter evaluation works on a best-effort basis
> > at
> > > the beginning of the FIP document? Specifically, it only performs
> > > coarse-grained block skipping by leveraging RecordBatch
> > statistics.&nbsp;To
> > > be honest, the table.newScan().filter(recordBatchFilter) API gave me
> the
> > > impression that the server side performs row-by-row filtering.&nbsp;
> > >
> > >
> > >
> > > Regards,
> > > Cheng
> > >
> > >
> > >
> > > &nbsp;
> > >
> > >
> > >
> > >
> > > ------------------ Original ------------------
> > > From:
> > >                                                   "dev"
> > >                                                                 <
> > > [email protected]&gt;;
> > > Date:&nbsp;Thu, Aug 7, 2025 11:11 AM
> > > To:&nbsp;"dev"<[email protected]&gt;;
> > >
> > > Subject:&nbsp;[DISCUSS] FIP-10: Support Log RecordBatch Filter Pushdown
> > >
> > >
> > >
> > > Hello Fluss Community,
> > >
> > > I propose initiating discussion on FIP-10: Support Log RecordBatch
> Filter
> > > Pushdown (
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-10%3A+Support+Log+RecordBatch+Filter+Pushdown
> > > ).
> > > This optimization aims to improve the performance of Log table queries
> > and
> > > is now ready for community feedback.
> > >
> > > This FIP introduces RecordBatch-level filter pushdown to enable early
> > > filtering at the storage layer, thereby optimizing CPU, memory, and
> > network
> > > resources by skipping non-matching log record batches.
> > >
> > > A proof-of-concept (PoC) has been implemented in the logfilter branch
> in
> > > https://github.com/platinumhamburg/fluss and is ready for testing and
> > > preview.
> >
>

Reply via email to