Thanks Yunhong. That's correct. I am able to make it work locally. Currently, in the process of writing a FLIP for the necessary changes to the SupportsFilterPushDown API to support nested fields filter push down.
Regards Venkata krishnan On Mon, Aug 7, 2023 at 8:28 PM yh z <zhengyunhon...@gmail.com> wrote: > Hi Venkatakrishnan, > Sorry for the late reply. I have looked at the code and feel like you need > to modify the logic of the > ExpressionConverter.visit(FieldReferenceExpression expression) method to > support nested types, > which are not currently supported in currently code. > > Regards, > Yunhong Zheng (Swuferhong) > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2023年8月7日周一 13:30写道: > > > (Sorry, I pressed send too early) > > > > Thanks for the help @zhengyunhon...@gmail.com. > > > > Agree on not changing the API as much as possible as well as wrt > > simplifying Projection pushdown with nested fields eventually as well. > > > > In terms of the code itself, currently I am trying to leverage the > > FieldReferenceExpression to also handle nested fields for filter push > down. > > But where I am currently struggling to make progress is, once the filters > > are pushed to the table source itself, in > > PushFilterIntoSourceScanRuleBase#resolveFiltersAndCreateTableSourceTable > > there is a conversion from List<ResolvedExpression (in this case > > FieldReferenceExpression) to the List<RexNode> itself. > > > > If you have some pointers for that, please let me know. Thanks. > > > > Regards > > Venkata krishnan > > > > > > On Sun, Aug 6, 2023 at 10:23 PM Venkatakrishnan Sowrirajan < > > vsowr...@asu.edu> > > wrote: > > > > > Thanks @zhengyunhon...@gmail.com > > > Regards > > > Venkata krishnan > > > > > > > > > On Sun, Aug 6, 2023 at 6:16 PM yh z <zhengyunhon...@gmail.com> wrote: > > > > > >> Hi, Venkatakrishnan, > > >> I think this is a very useful feature. I have been focusing on the > > >> development of the flink-table-planner module recently, so if you need > > >> some > > >> help, I can assist you in completing the development of some sub-tasks > > or > > >> code review. > > >> > > >> Returning to the design itself, I think it's necessary to modify > > >> FieldReferenceExpression or re-implement a > > NestedFieldReferenceExpression. > > >> As for modifying the interface of SupportsProjectionPushDown, I think > we > > >> need to make some trade-offs. As a connector developer, the stability > of > > >> the interface is very important. If there are no unresolved bugs, I > > >> personally do not recommend modifying the interface. However, when I > > first > > >> read the code of SupportsProjectionPushDown, the design of int[][] was > > >> very > > >> confusing for me, and it took me a long time to understand it by > running > > >> specify UT tests. Therefore, in terms of the design of this interface > > and > > >> the consistency between different interfaces, there is indeed room for > > >> improvement it. > > >> > > >> Thanks, > > >> Yunhong Zheng (Swuferhong) > > >> > > >> > > >> > > >> > > >> Becket Qin <becket....@gmail.com> 于2023年8月3日周四 07:44写道: > > >> > > >> > Hi Jark, > > >> > > > >> > If the FieldReferenceExpression contains an int[] to support a > nested > > >> field > > >> > reference, List<FieldReferenceExpression> (or > > >> FieldReferenceExpression[]) > > >> > and int[][] are actually equivalent. If we are designing this from > > >> scratch, > > >> > personally I prefer using List<FieldReferenceExpression> for > > >> consistency, > > >> > i.e. always resolving everything to expressions for users. > Projection > > >> is a > > >> > simpler case, but should not be a special case. This avoids doing > the > > >> same > > >> > thing in different ways which is also a confusion to the users. To > me, > > >> the > > >> > int[][] format would become kind of a technical debt after we extend > > the > > >> > FieldReferenceExpression. Although we don't have to address it right > > >> away > > >> > in the same FLIP, this kind of debt accumulates over time and makes > > the > > >> > project harder to learn and maintain. So, personally I prefer to > > address > > >> > these technical debts as soon as possible. > > >> > > > >> > Thanks, > > >> > > > >> > Jiangjie (Becket) Qin > > >> > > > >> > On Wed, Aug 2, 2023 at 8:19 PM Jark Wu <imj...@gmail.com> wrote: > > >> > > > >> > > Hi, > > >> > > > > >> > > I agree with Becket that we may need to extend > > >> FieldReferenceExpression > > >> > to > > >> > > support nested field access (or maybe a new > > >> > > NestedFieldReferenceExpression). > > >> > > But I have some concerns about evolving the > > >> > > SupportsProjectionPushDown.applyProjection. > > >> > > A projection is much simpler than Filter Expression which only > needs > > >> to > > >> > > represent the field indexes. > > >> > > If we evolve `applyProjection` to accept > > >> `List<FieldReferenceExpression> > > >> > > projectedFields`, > > >> > > users have to convert the `List<FieldReferenceExpression>` back to > > >> > int[][] > > >> > > which is an overhead for users. > > >> > > Field indexes (int[][]) is required to project schemas with the > > >> > > utility org.apache.flink.table.connector.Projection. > > >> > > > > >> > > > > >> > > Best, > > >> > > Jark > > >> > > > > >> > > > > >> > > > > >> > > On Wed, 2 Aug 2023 at 07:40, Venkatakrishnan Sowrirajan < > > >> > vsowr...@asu.edu> > > >> > > wrote: > > >> > > > > >> > > > Thanks Becket for the suggestion. That makes sense. Let me try > it > > >> out > > >> > and > > >> > > > get back to you. > > >> > > > > > >> > > > Regards > > >> > > > Venkata krishnan > > >> > > > > > >> > > > > > >> > > > On Tue, Aug 1, 2023 at 9:04 AM Becket Qin <becket....@gmail.com > > > > >> > wrote: > > >> > > > > > >> > > > > This is a very useful feature in practice. > > >> > > > > > > >> > > > > It looks to me that the key issue here is that Flink > > >> > ResolvedExpression > > >> > > > > does not have necessary abstraction for nested field access. > So > > >> the > > >> > > > Calcite > > >> > > > > RexFieldAccess does not have a counterpart in the > > >> ResolvedExpression. > > >> > > The > > >> > > > > FieldReferenceExpression only supports direct access to the > > >> fields, > > >> > not > > >> > > > > nested access. > > >> > > > > > > >> > > > > Theoretically speaking, this nested field reference is also > > >> required > > >> > by > > >> > > > > projection pushdown. However, we addressed that by using an > > >> int[][] > > >> > in > > >> > > > the > > >> > > > > SupportsProjectionPushDown interface. Maybe we can do the > > >> following: > > >> > > > > > > >> > > > > 1. Extend the FieldReferenceExpression to include an int[] for > > >> nested > > >> > > > field > > >> > > > > access, > > >> > > > > 2. By doing (1), > > >> > > > > SupportsFilterPushDown#applyFilters(List<ResolvedExpression>) > > can > > >> > > support > > >> > > > > nested field access. > > >> > > > > 3. Evolve the > SupportsProjectionPushDown.applyProjection(int[][] > > >> > > > > projectedFields, DataType producedDataType) to > > >> > > > > applyProjection(List<FieldReferenceExpression> > projectedFields, > > >> > > DataType > > >> > > > > producedDataType) > > >> > > > > > > >> > > > > This will need a FLIP. > > >> > > > > > > >> > > > > Thanks, > > >> > > > > > > >> > > > > Jiangjie (Becket) Qin > > >> > > > > > > >> > > > > On Tue, Aug 1, 2023 at 11:42 PM Venkatakrishnan Sowrirajan < > > >> > > > > vsowr...@asu.edu> > > >> > > > > wrote: > > >> > > > > > > >> > > > > > Thanks for the response. Looking forward to your pointers. > In > > >> the > > >> > > > > > meanwhile, let me figure out how we can implement it. Will > > keep > > >> you > > >> > > > > posted. > > >> > > > > > > > >> > > > > > On Mon, Jul 31, 2023, 11:43 PM liu ron <ron9....@gmail.com> > > >> wrote: > > >> > > > > > > > >> > > > > > > Hi, Venkata > > >> > > > > > > > > >> > > > > > > Thanks for reporting this issue. Currently, Flink doesn't > > >> support > > >> > > > > nested > > >> > > > > > > filter pushdown. I also think that this optimization would > > be > > >> > > useful, > > >> > > > > > > especially for jobs, which may need to read a lot of data > > from > > >> > the > > >> > > > > > parquet > > >> > > > > > > or orc file. We didn't move forward with this for some > > >> priority > > >> > > > > reasons. > > >> > > > > > > > > >> > > > > > > Regarding your three questions, I will respond to you > later > > >> after > > >> > > my > > >> > > > > > > on-call is finished because I need to dive into the source > > >> code. > > >> > > > About > > >> > > > > > your > > >> > > > > > > commit, I don't think it's the right solution because > > >> > > > > > > FieldReferenceExpression doesn't currently support nested > > >> field > > >> > > > filter > > >> > > > > > > pushdown, maybe we need to extend it. > > >> > > > > > > > > >> > > > > > > You can also look further into reasonable solutions, which > > >> we'll > > >> > > > > discuss > > >> > > > > > > further later on. > > >> > > > > > > > > >> > > > > > > Best, > > >> > > > > > > Ron > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> > 于2023年7月29日周六 > > >> > > 03:31写道: > > >> > > > > > > > > >> > > > > > > > Hi all, > > >> > > > > > > > > > >> > > > > > > > Currently, I am working on adding support for nested > > fields > > >> > > filter > > >> > > > > push > > >> > > > > > > > down. In our use case running Flink on Batch, we found > > >> nested > > >> > > > fields > > >> > > > > > > filter > > >> > > > > > > > push down is key - without it, it is significantly slow. > > >> Note: > > >> > > > Spark > > >> > > > > > SQL > > >> > > > > > > > supports nested fields filter push down. > > >> > > > > > > > > > >> > > > > > > > While debugging the code using IcebergTableSource as the > > >> table > > >> > > > > source, > > >> > > > > > > > narrowed down the issue to missing support for > > >> > > > > > > > > > >> RexNodeExtractor#RexNodeToExpressionConverter#visitFieldAccess. > > >> > > > > > > > As part of fixing it, I made changes by returning an > > >> > > > > > > > Option(FieldReferenceExpression) > > >> > > > > > > > with appropriate reference to the parent index and the > > child > > >> > > index > > >> > > > > for > > >> > > > > > > the > > >> > > > > > > > nested field with the data type info. > > >> > > > > > > > > > >> > > > > > > > But this new ResolvedExpression cannot be converted to > > >> RexNode > > >> > > > which > > >> > > > > > > > happens in PushFilterIntoSourceScanRuleBase > > >> > > > > > > > < > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/3f63e03e83144e9857834f8db1895637d2aa218a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java*L104__;Iw!!IKRxdwAv5BmarQ!fNgxcul8ZGwkNE9ygOeVGlWlU6m_MLMXf4A3S3oQu9LBzYTPF90pZ7uXSGMr-5dFmzRn37-e9Q5cMnVs$ > > >> > > > > > > > > > > >> > > > > > > > . > > >> > > > > > > > > > >> > > > > > > > Few questions > > >> > > > > > > > > > >> > > > > > > > 1. Does FieldReferenceExpression support nested fields > > >> > currently > > >> > > or > > >> > > > > > > should > > >> > > > > > > > it be extended to support nested fields? I couldn't > figure > > >> this > > >> > > out > > >> > > > > > from > > >> > > > > > > > the PushProjectIntoTableScanRule that supports nested > > column > > >> > > > > projection > > >> > > > > > > > push down. > > >> > > > > > > > 2. ExpressionConverter > > >> > > > > > > > < > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/3f63e03e83144e9857834f8db1895637d2aa218a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java*L197__;Iw!!IKRxdwAv5BmarQ!fNgxcul8ZGwkNE9ygOeVGlWlU6m_MLMXf4A3S3oQu9LBzYTPF90pZ7uXSGMr-5dFmzRn37-e9Z6jnkJm$ > > >> > > > > > > > > > > >> > > > > > > > converts ResolvedExpression -> RexNode but the new > > >> > > > > > > FieldReferenceExpression > > >> > > > > > > > with the nested field cannot be converted to RexNode. > This > > >> is > > >> > why > > >> > > > the > > >> > > > > > > > answer to the 1st question is key. > > >> > > > > > > > 3. Anything else that I'm missing here? or is there an > > even > > >> > > easier > > >> > > > > way > > >> > > > > > to > > >> > > > > > > > add support for nested fields filter push down? > > >> > > > > > > > > > >> > > > > > > > Partially working changes - Commit > > >> > > > > > > > < > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://urldefense.com/v3/__https://github.com/venkata91/flink/commit/00cdf34ecf9be3ba669a97baaed4b69b85cd26f9__;!!IKRxdwAv5BmarQ!fNgxcul8ZGwkNE9ygOeVGlWlU6m_MLMXf4A3S3oQu9LBzYTPF90pZ7uXSGMr-5dFmzRn37-e9XeOjJ_a$ > > >> > > > > > > > > > > >> > > > > > > > Please > > >> > > > > > > > feel free to leave a comment directly in the commit. > > >> > > > > > > > > > >> > > > > > > > Any pointers here would be much appreciated! Thanks in > > >> advance. > > >> > > > > > > > > > >> > > > > > > > Disclaimer: Relatively new to Flink code base especially > > >> Table > > >> > > > > planner > > >> > > > > > > :-). > > >> > > > > > > > > > >> > > > > > > > Regards > > >> > > > > > > > Venkata krishnan > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > >