tanmayrauth commented on code in PR #1099:
URL: https://github.com/apache/iceberg-go/pull/1099#discussion_r3291870459


##########
table/transaction.go:
##########
@@ -1798,3 +1905,53 @@ func (s *StagedTable) Refresh(ctx context.Context) 
(*Table, error) {
 func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
        panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
 }
+
+// prepareBatchFilter binds the given Iceberg filter against schema and 
converts
+// it to substrait once, returning a per-batch filter function that can be
+// reused across every record batch. The setup work (BindExpr, ConvertExpr) is
+// independent of the batch and is the most expensive part of filter-eval, so
+// hoisting it out of the iterator loop is a measurable win on rewrites that
+// produce many batches.
+//
+// The returned function takes ownership of the input batch (it releases it on
+// the AlwaysFalse fast-path) and returns a possibly-new batch the caller is
+// responsible for releasing.
+func prepareBatchFilter(ctx context.Context, filter iceberg.BooleanExpression, 
schema *iceberg.Schema, caseSensitive bool) (func(arrow.RecordBatch) 
(arrow.RecordBatch, error), error) {
+       if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+               return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
+                       return rec, nil
+               }, nil
+       }
+
+       bound, err := iceberg.BindExpr(schema, filter, caseSensitive)
+       if err != nil {
+               return nil, fmt.Errorf("prepareBatchFilter: bind expression: 
%w", err)
+       }
+
+       if bound == nil {
+               return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
+                       return rec, nil
+               }, nil
+       }
+
+       if bound.Equals(iceberg.AlwaysFalse{}) {
+               // Return a record with the same schema and zero rows. 
NewSlice(0, 0)
+               // preserves the per-field arrays so downstream code that calls
+               // rec.Column(i) does not panic on the empty result.
+               return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
+                       defer rec.Release()
+
+                       return rec.NewSlice(0, 0), nil
+               }, nil
+       }
+
+       extSet, substraitFilter, err := substrait.ConvertExpr(schema, bound, 
caseSensitive)
+       if err != nil {
+               return nil, fmt.Errorf("prepareBatchFilter: convert expression: 
%w", err)
+       }
+
+       ctx = exprs.WithExtensionIDSet(ctx, 
exprs.NewExtensionSetDefault(*extSet))

Review Comment:
   Changed the closure signature to func(context.Context, arrow.RecordBatch) — 
extSet is pre-computed once and reused, but the per-call ctx (with 
allocator/deadline) is attached inside the closure on each  call. Also added a 
one-line comment on the positional-binding assumption (lineage fields at 
indices N..N+1, filter bound against 0..N-1).



##########
table/scanner.go:
##########
@@ -261,26 +262,96 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
                }
        }
 
-       if slices.Contains(scan.selectedFields, "*") {
-               return curSchema, nil
+       if scan.includeRowLineage && curVersion < minFormatVersionRowLineage {
+               return nil, fmt.Errorf("%w: row lineage requires format version 
%d, table is v%d",
+                       ErrInvalidOperation, minFormatVersionRowLineage, 
curVersion)
        }
 
-       selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields, 
caseSensitive)
-       schemaMeta := metaFieldsFromSchema(curSchema)
-       synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
-       if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+       var schema *iceberg.Schema
+       if slices.Contains(scan.selectedFields, "*") {
+               schema = curSchema
+       } else {
+               // Intercept row-lineage metadata column names (_row_id,
+               // _last_updated_sequence_number) before calling Select: they 
are
+               // reserved and never appear in the user schema's fields, so
+               // Select would fail with "could not find column" on v3 tables
+               // where they are otherwise legal to project. The scanner reads
+               // them from file metadata (or synthesizes them) at scan time;
+               // here we just need to ensure they survive into the projection.
+               //
+               // On v1/v2 tables, Select is left to fail naturally — those
+               // versions don't have row lineage, so requesting these columns
+               // is an error.
+               userFields, lineageFields := 
splitLineageMetadataFields(scan.selectedFields, scan.caseSensitive)
+               if len(lineageFields) > 0 && curVersion < 
minFormatVersionRowLineage {
+                       userFields = scan.selectedFields
+                       lineageFields = nil
+               }
 
-               // synthesis path
-               removedMetaSlice, missingMetaFields := 
removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
-               sch, err := curSchema.Select(scan.caseSensitive, 
removedMetaSlice...)
+               var err error
+               schema, err = curSchema.Select(scan.caseSensitive, 
userFields...)
                if err != nil {
                        return nil, err
                }
+               if len(lineageFields) > 0 {
+                       schema = appendMissingLineageFields(schema, 
lineageFields)
+               }
+       }
+
+       if scan.includeRowLineage {

Review Comment:
   Did the smaller piece — skipping appendMissingLineageFields when 
scan.includeRowLineage is set so the redundant pass goes away. Pushed back on 
the full canonicalization (auto-setting  includeRowLineage=true whenever any 
lineage name appears in selectedFields): selecting only _row_id today gives you 
only _row_id, but with the rewrite it would also pull in 
_last_updated_sequence_number.  That's a behavior change for the legacy 
Select(_row_id) escape hatch. The maintainability concern (third column down 
the road) is mostly handled by splitLineageMetadataFields + 
SchemaWithRowLineage already centralizing the column list.



##########
table/scanner.go:
##########
@@ -261,26 +262,96 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
                }
        }
 
-       if slices.Contains(scan.selectedFields, "*") {
-               return curSchema, nil
+       if scan.includeRowLineage && curVersion < minFormatVersionRowLineage {
+               return nil, fmt.Errorf("%w: row lineage requires format version 
%d, table is v%d",
+                       ErrInvalidOperation, minFormatVersionRowLineage, 
curVersion)
        }
 
-       selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields, 
caseSensitive)
-       schemaMeta := metaFieldsFromSchema(curSchema)
-       synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
-       if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+       var schema *iceberg.Schema
+       if slices.Contains(scan.selectedFields, "*") {
+               schema = curSchema
+       } else {
+               // Intercept row-lineage metadata column names (_row_id,
+               // _last_updated_sequence_number) before calling Select: they 
are
+               // reserved and never appear in the user schema's fields, so
+               // Select would fail with "could not find column" on v3 tables
+               // where they are otherwise legal to project. The scanner reads
+               // them from file metadata (or synthesizes them) at scan time;
+               // here we just need to ensure they survive into the projection.
+               //
+               // On v1/v2 tables, Select is left to fail naturally — those
+               // versions don't have row lineage, so requesting these columns
+               // is an error.
+               userFields, lineageFields := 
splitLineageMetadataFields(scan.selectedFields, scan.caseSensitive)
+               if len(lineageFields) > 0 && curVersion < 
minFormatVersionRowLineage {

Review Comment:
   Done, explicit ErrInvalidOperation with the format version named, rather 
than relying on Select's ErrInvalidSchema. Updated 
TestProjectionRowLineageRejectedOnV1V2 to match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to