zeroshade commented on code in PR #735:
URL: https://github.com/apache/iceberg-go/pull/735#discussion_r3001934611


##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       nrows := batch.NumRows()
+
+       // Start from the existing columns; we'll replace the row lineage 
columns in-place
+       // when we need to synthesize values.
+       newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       var toRelease []arrow.Array
+
+       // _row_id: inherit first_row_id + row_position when null; else keep 
value from file.
+       if rowIDColIdx >= 0 && task.FirstRowID != nil {
+               if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()
+
+                       bldr.Reserve(int(nrows))
+                       first := *task.FirstRowID
+                       for k := int64(0); k < nrows; k++ {
+                               if col.IsNull(int(k)) {
+                                       bldr.Append(first + *rowOffset + k)
+                               } else {
+                                       bldr.Append(col.Value(int(k)))
+                               }
+                       }
+
+                       arr := bldr.NewArray()
+                       newCols[rowIDColIdx] = arr
+                       toRelease = append(toRelease, arr)
+               }
+       }
+
+       // _last_updated_sequence_number: inherit file's data_sequence_number 
when null; else keep value from file.
+       if seqNumColIdx >= 0 && task.DataSequenceNumber != nil {
+               if col, ok := newCols[seqNumColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()

Review Comment:
   as above, reuse the hoisted instance here



##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       nrows := batch.NumRows()
+
+       // Start from the existing columns; we'll replace the row lineage 
columns in-place
+       // when we need to synthesize values.
+       newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       var toRelease []arrow.Array
+
+       // _row_id: inherit first_row_id + row_position when null; else keep 
value from file.
+       if rowIDColIdx >= 0 && task.FirstRowID != nil {
+               if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()

Review Comment:
   We can reuse this builder. Hoist this out of the ifs so it looks more like:
   
   ```go
   bldr := array.NewInt64Builder(alloc)
   defer bldr.Release()
   
   if rowIDColIdx >= 0 && task.FirstRowID != nil {
       if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
           ...
       }
   }
   
   ...
   ```



##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       nrows := batch.NumRows()
+
+       // Start from the existing columns; we'll replace the row lineage 
columns in-place
+       // when we need to synthesize values.
+       newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       var toRelease []arrow.Array
+
+       // _row_id: inherit first_row_id + row_position when null; else keep 
value from file.
+       if rowIDColIdx >= 0 && task.FirstRowID != nil {
+               if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()
+
+                       bldr.Reserve(int(nrows))
+                       first := *task.FirstRowID
+                       for k := int64(0); k < nrows; k++ {
+                               if col.IsNull(int(k)) {
+                                       bldr.Append(first + *rowOffset + k)
+                               } else {
+                                       bldr.Append(col.Value(int(k)))
+                               }
+                       }
+
+                       arr := bldr.NewArray()
+                       newCols[rowIDColIdx] = arr
+                       toRelease = append(toRelease, arr)
+               }
+       }
+
+       // _last_updated_sequence_number: inherit file's data_sequence_number 
when null; else keep value from file.
+       if seqNumColIdx >= 0 && task.DataSequenceNumber != nil {
+               if col, ok := newCols[seqNumColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()
+
+                       bldr.Reserve(int(nrows))
+                       seq := *task.DataSequenceNumber
+                       for k := int64(0); k < nrows; k++ {

Review Comment:
   ```suggestion
                        for k := range nrows {
   ```



##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       nrows := batch.NumRows()
+
+       // Start from the existing columns; we'll replace the row lineage 
columns in-place
+       // when we need to synthesize values.
+       newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       var toRelease []arrow.Array

Review Comment:
   instead of this, just immediately `defer arr.Release()` after adding them to 
`newCols`



##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       nrows := batch.NumRows()
+
+       // Start from the existing columns; we'll replace the row lineage 
columns in-place
+       // when we need to synthesize values.
+       newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       var toRelease []arrow.Array
+
+       // _row_id: inherit first_row_id + row_position when null; else keep 
value from file.
+       if rowIDColIdx >= 0 && task.FirstRowID != nil {
+               if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()
+
+                       bldr.Reserve(int(nrows))
+                       first := *task.FirstRowID
+                       for k := int64(0); k < nrows; k++ {
+                               if col.IsNull(int(k)) {
+                                       bldr.Append(first + *rowOffset + k)
+                               } else {
+                                       bldr.Append(col.Value(int(k)))
+                               }
+                       }
+
+                       arr := bldr.NewArray()
+                       newCols[rowIDColIdx] = arr
+                       toRelease = append(toRelease, arr)
+               }
+       }
+
+       // _last_updated_sequence_number: inherit file's data_sequence_number 
when null; else keep value from file.
+       if seqNumColIdx >= 0 && task.DataSequenceNumber != nil {
+               if col, ok := newCols[seqNumColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()
+
+                       bldr.Reserve(int(nrows))
+                       seq := *task.DataSequenceNumber
+                       for k := int64(0); k < nrows; k++ {
+                               if col.IsNull(int(k)) {
+                                       bldr.Append(seq)
+                               } else {
+                                       bldr.Append(col.Value(int(k)))
+                               }
+                       }
+
+                       arr := bldr.NewArray()
+                       newCols[seqNumColIdx] = arr
+                       toRelease = append(toRelease, arr)

Review Comment:
   ```suggestion
                        defer arr.Release()
   ```



##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       nrows := batch.NumRows()
+
+       // Start from the existing columns; we'll replace the row lineage 
columns in-place
+       // when we need to synthesize values.
+       newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       var toRelease []arrow.Array
+
+       // _row_id: inherit first_row_id + row_position when null; else keep 
value from file.
+       if rowIDColIdx >= 0 && task.FirstRowID != nil {
+               if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()
+
+                       bldr.Reserve(int(nrows))
+                       first := *task.FirstRowID
+                       for k := int64(0); k < nrows; k++ {

Review Comment:
   ```suggestion
                        for k := range nrows {
   ```
   



##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       nrows := batch.NumRows()
+
+       // Start from the existing columns; we'll replace the row lineage 
columns in-place
+       // when we need to synthesize values.
+       newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       var toRelease []arrow.Array
+
+       // _row_id: inherit first_row_id + row_position when null; else keep 
value from file.
+       if rowIDColIdx >= 0 && task.FirstRowID != nil {
+               if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()
+
+                       bldr.Reserve(int(nrows))
+                       first := *task.FirstRowID
+                       for k := int64(0); k < nrows; k++ {
+                               if col.IsNull(int(k)) {
+                                       bldr.Append(first + *rowOffset + k)
+                               } else {
+                                       bldr.Append(col.Value(int(k)))
+                               }
+                       }
+
+                       arr := bldr.NewArray()
+                       newCols[rowIDColIdx] = arr
+                       toRelease = append(toRelease, arr)

Review Comment:
   ```suggestion
                        defer arr.Release()
   ```



##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       nrows := batch.NumRows()
+
+       // Start from the existing columns; we'll replace the row lineage 
columns in-place
+       // when we need to synthesize values.
+       newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       var toRelease []arrow.Array
+
+       // _row_id: inherit first_row_id + row_position when null; else keep 
value from file.
+       if rowIDColIdx >= 0 && task.FirstRowID != nil {
+               if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()
+
+                       bldr.Reserve(int(nrows))
+                       first := *task.FirstRowID
+                       for k := int64(0); k < nrows; k++ {
+                               if col.IsNull(int(k)) {
+                                       bldr.Append(first + *rowOffset + k)
+                               } else {
+                                       bldr.Append(col.Value(int(k)))
+                               }
+                       }
+
+                       arr := bldr.NewArray()
+                       newCols[rowIDColIdx] = arr
+                       toRelease = append(toRelease, arr)
+               }
+       }
+
+       // _last_updated_sequence_number: inherit file's data_sequence_number 
when null; else keep value from file.
+       if seqNumColIdx >= 0 && task.DataSequenceNumber != nil {
+               if col, ok := newCols[seqNumColIdx].(*array.Int64); ok {
+                       bldr := array.NewInt64Builder(alloc)
+                       defer bldr.Release()
+
+                       bldr.Reserve(int(nrows))
+                       seq := *task.DataSequenceNumber
+                       for k := int64(0); k < nrows; k++ {
+                               if col.IsNull(int(k)) {
+                                       bldr.Append(seq)
+                               } else {
+                                       bldr.Append(col.Value(int(k)))
+                               }
+                       }
+
+                       arr := bldr.NewArray()
+                       newCols[seqNumColIdx] = arr
+                       toRelease = append(toRelease, arr)
+               }
+       }
+
+       // Advance so the next batch from this file uses the correct row 
position for _row_id.
+       *rowOffset += nrows
+
+       rec := array.NewRecordBatch(schema, newCols, nrows)
+       for _, c := range toRelease {
+               c.Release()
+       }

Review Comment:
   don't need this anymore if you just immediately defer release at the point 
where you call `NewArray`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to