zeroshade commented on code in PR #735:
URL: https://github.com/apache/iceberg-go/pull/735#discussion_r2943057564


##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       ncols := int(batch.NumCols())
+       nrows := batch.NumRows()
+       newCols := make([]arrow.Array, ncols)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       for i := 0; i < ncols; i++ {
+               if i == rowIDColIdx && task.FirstRowID != nil {
+                       // _row_id: inherit first_row_id + row_position when 
null; else keep value from file.
+                       if col, ok := batch.Column(i).(*array.Int64); ok {
+                               bldr := array.NewInt64Builder(alloc)
+                               first := *task.FirstRowID
+                               for k := int64(0); k < nrows; k++ {
+                                       if col.IsNull(int(k)) {
+                                               bldr.Append(first + *rowOffset 
+ k)
+                                       } else {
+                                               bldr.Append(col.Value(int(k)))
+                                       }
+                               }
+                               newCols[i] = bldr.NewArray()
+                               bldr.Release()

Review Comment:
   Reuse the builder and use `Reserve` since we already know the size we're 
gonna append



##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       ncols := int(batch.NumCols())
+       nrows := batch.NumRows()
+       newCols := make([]arrow.Array, ncols)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       for i := 0; i < ncols; i++ {
+               if i == rowIDColIdx && task.FirstRowID != nil {
+                       // _row_id: inherit first_row_id + row_position when 
null; else keep value from file.
+                       if col, ok := batch.Column(i).(*array.Int64); ok {
+                               bldr := array.NewInt64Builder(alloc)
+                               first := *task.FirstRowID
+                               for k := int64(0); k < nrows; k++ {
+                                       if col.IsNull(int(k)) {
+                                               bldr.Append(first + *rowOffset 
+ k)
+                                       } else {
+                                               bldr.Append(col.Value(int(k)))
+                                       }
+                               }
+                               newCols[i] = bldr.NewArray()
+                               bldr.Release()
+
+                               continue
+                       }
+               }
+
+               if i == seqNumColIdx && task.DataSequenceNumber != nil {
+                       // _last_updated_sequence_number: inherit file's 
data_sequence_number when null; else keep value from file.
+                       if col, ok := batch.Column(i).(*array.Int64); ok {
+                               bldr := array.NewInt64Builder(alloc)
+                               seq := *task.DataSequenceNumber
+                               for k := int64(0); k < nrows; k++ {
+                                       if col.IsNull(int(k)) {
+                                               bldr.Append(seq)
+                                       } else {
+                                               bldr.Append(col.Value(int(k)))
+                                       }
+                               }
+                               newCols[i] = bldr.NewArray()
+                               bldr.Release()
+
+                               continue
+                       }
+               }
+
+               col := batch.Column(i)
+               col.Retain()

Review Comment:
   this seems like an extra Retain that isn't necessary, since you're going to 
add them to the record batch, you can just `defer col.Release()` the new 
columns you build instead of needing the release loop below



##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       ncols := int(batch.NumCols())
+       nrows := batch.NumRows()
+       newCols := make([]arrow.Array, ncols)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       for i := 0; i < ncols; i++ {
+               if i == rowIDColIdx && task.FirstRowID != nil {

Review Comment:
   I'm confused, if we already know the rowIDColIdx to compare against, why do 
we need the loop?



##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       ncols := int(batch.NumCols())
+       nrows := batch.NumRows()
+       newCols := make([]arrow.Array, ncols)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       for i := 0; i < ncols; i++ {
+               if i == rowIDColIdx && task.FirstRowID != nil {
+                       // _row_id: inherit first_row_id + row_position when 
null; else keep value from file.
+                       if col, ok := batch.Column(i).(*array.Int64); ok {
+                               bldr := array.NewInt64Builder(alloc)
+                               first := *task.FirstRowID
+                               for k := int64(0); k < nrows; k++ {
+                                       if col.IsNull(int(k)) {
+                                               bldr.Append(first + *rowOffset 
+ k)
+                                       } else {
+                                               bldr.Append(col.Value(int(k)))
+                                       }
+                               }
+                               newCols[i] = bldr.NewArray()
+                               bldr.Release()
+
+                               continue
+                       }
+               }
+
+               if i == seqNumColIdx && task.DataSequenceNumber != nil {
+                       // _last_updated_sequence_number: inherit file's 
data_sequence_number when null; else keep value from file.
+                       if col, ok := batch.Column(i).(*array.Int64); ok {
+                               bldr := array.NewInt64Builder(alloc)
+                               seq := *task.DataSequenceNumber
+                               for k := int64(0); k < nrows; k++ {
+                                       if col.IsNull(int(k)) {
+                                               bldr.Append(seq)
+                                       } else {
+                                               bldr.Append(col.Value(int(k)))
+                                       }
+                               }
+                               newCols[i] = bldr.NewArray()
+                               bldr.Release()

Review Comment:
   same comment as above



##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       ncols := int(batch.NumCols())
+       nrows := batch.NumRows()
+       newCols := make([]arrow.Array, ncols)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       for i := 0; i < ncols; i++ {
+               if i == rowIDColIdx && task.FirstRowID != nil {
+                       // _row_id: inherit first_row_id + row_position when 
null; else keep value from file.
+                       if col, ok := batch.Column(i).(*array.Int64); ok {
+                               bldr := array.NewInt64Builder(alloc)
+                               first := *task.FirstRowID
+                               for k := int64(0); k < nrows; k++ {
+                                       if col.IsNull(int(k)) {
+                                               bldr.Append(first + *rowOffset 
+ k)
+                                       } else {
+                                               bldr.Append(col.Value(int(k)))
+                                       }
+                               }
+                               newCols[i] = bldr.NewArray()
+                               bldr.Release()
+
+                               continue
+                       }
+               }
+
+               if i == seqNumColIdx && task.DataSequenceNumber != nil {

Review Comment:
   same as above, why do we need to loop and find the column if we already know 
`seqNumColIdx` to look for?



##########
table/arrow_scanner.go:
##########
@@ -513,6 +598,17 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, 
task internal.Enumerat
                return ToRequestedSchema(ctx, as.projectedSchema, iceSchema, r, 
false, false, as.useLargeTypes)
        })
 
+       // Row lineage (v3): fill _row_id and _last_updated_sequence_number 
from task constants when in projection.
+       if task.Value.FirstRowID != nil || task.Value.DataSequenceNumber != nil 
{
+               var rowOffset int64
+               taskVal := task.Value
+               pipeline = append(pipeline, func(r arrow.RecordBatch) 
(arrow.RecordBatch, error) {
+                       defer r.Release()
+
+                       return synthesizeRowLineageColumns(ctx, &rowOffset, 
taskVal, r)

Review Comment:
   I haven't double checked the spec, but should the row lineage columns be 
toggleable via a setting? i.e. a way to turn them off if you don't want them to 
show up in the results?



##########
table/scanner.go:
##########
@@ -474,12 +474,20 @@ func (scan *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
                if err != nil {
                        return nil, err
                }
-               results = append(results, FileScanTask{
+               task := FileScanTask{
                        File:        e.DataFile(),
                        DeleteFiles: deleteFiles,
                        Start:       0,
                        Length:      e.DataFile().FileSizeBytes(),
-               })
+               }
+               // Row lineage constants for v3: readers use these to 
synthesize _row_id and _last_updated_sequence_number.
+               if scan.metadata.Version() >= 3 {
+                       task.FirstRowID = e.DataFile().FirstRowID()
+                       if fseq := e.FileSequenceNum(); fseq != nil {
+                               task.DataSequenceNumber = fseq
+                       }
+               }

Review Comment:
   any reason not to always populate these? at worst we're just setting it to 
nil if we're not >= v3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to