zeroshade commented on code in PR #735:
URL: https://github.com/apache/iceberg-go/pull/735#discussion_r2943057564
##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ ncols := int(batch.NumCols())
+ nrows := batch.NumRows()
+ newCols := make([]arrow.Array, ncols)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ for i := 0; i < ncols; i++ {
+ if i == rowIDColIdx && task.FirstRowID != nil {
+ // _row_id: inherit first_row_id + row_position when
null; else keep value from file.
+ if col, ok := batch.Column(i).(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(first + *rowOffset
+ k)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+ newCols[i] = bldr.NewArray()
+ bldr.Release()
Review Comment:
Reuse the builder and use `Reserve` since we already know the size we're
gonna append
##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ ncols := int(batch.NumCols())
+ nrows := batch.NumRows()
+ newCols := make([]arrow.Array, ncols)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ for i := 0; i < ncols; i++ {
+ if i == rowIDColIdx && task.FirstRowID != nil {
+ // _row_id: inherit first_row_id + row_position when
null; else keep value from file.
+ if col, ok := batch.Column(i).(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(first + *rowOffset
+ k)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+ newCols[i] = bldr.NewArray()
+ bldr.Release()
+
+ continue
+ }
+ }
+
+ if i == seqNumColIdx && task.DataSequenceNumber != nil {
+ // _last_updated_sequence_number: inherit file's
data_sequence_number when null; else keep value from file.
+ if col, ok := batch.Column(i).(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ seq := *task.DataSequenceNumber
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(seq)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+ newCols[i] = bldr.NewArray()
+ bldr.Release()
+
+ continue
+ }
+ }
+
+ col := batch.Column(i)
+ col.Retain()
Review Comment:
this seems like an extra Retain that isn't necessary, since you're going to
add them to the record batch, you can just `defer col.Release()` the new
columns you build instead of needing the release loop below
##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ ncols := int(batch.NumCols())
+ nrows := batch.NumRows()
+ newCols := make([]arrow.Array, ncols)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ for i := 0; i < ncols; i++ {
+ if i == rowIDColIdx && task.FirstRowID != nil {
Review Comment:
I'm confused, if we already know the rowIDColIdx to compare against, why do
we need the loop?
##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ ncols := int(batch.NumCols())
+ nrows := batch.NumRows()
+ newCols := make([]arrow.Array, ncols)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ for i := 0; i < ncols; i++ {
+ if i == rowIDColIdx && task.FirstRowID != nil {
+ // _row_id: inherit first_row_id + row_position when
null; else keep value from file.
+ if col, ok := batch.Column(i).(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(first + *rowOffset
+ k)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+ newCols[i] = bldr.NewArray()
+ bldr.Release()
+
+ continue
+ }
+ }
+
+ if i == seqNumColIdx && task.DataSequenceNumber != nil {
+ // _last_updated_sequence_number: inherit file's
data_sequence_number when null; else keep value from file.
+ if col, ok := batch.Column(i).(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ seq := *task.DataSequenceNumber
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(seq)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+ newCols[i] = bldr.NewArray()
+ bldr.Release()
Review Comment:
same comment as above
##########
table/arrow_scanner.go:
##########
@@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ ncols := int(batch.NumCols())
+ nrows := batch.NumRows()
+ newCols := make([]arrow.Array, ncols)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ for i := 0; i < ncols; i++ {
+ if i == rowIDColIdx && task.FirstRowID != nil {
+ // _row_id: inherit first_row_id + row_position when
null; else keep value from file.
+ if col, ok := batch.Column(i).(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(first + *rowOffset
+ k)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+ newCols[i] = bldr.NewArray()
+ bldr.Release()
+
+ continue
+ }
+ }
+
+ if i == seqNumColIdx && task.DataSequenceNumber != nil {
Review Comment:
same as above, why do we need to loop and find the column if we already know
`seqNumColIdx` to look for?
##########
table/arrow_scanner.go:
##########
@@ -513,6 +598,17 @@ func (as *arrowScan) recordsFromTask(ctx context.Context,
task internal.Enumerat
return ToRequestedSchema(ctx, as.projectedSchema, iceSchema, r,
false, false, as.useLargeTypes)
})
+ // Row lineage (v3): fill _row_id and _last_updated_sequence_number
from task constants when in projection.
+ if task.Value.FirstRowID != nil || task.Value.DataSequenceNumber != nil
{
+ var rowOffset int64
+ taskVal := task.Value
+ pipeline = append(pipeline, func(r arrow.RecordBatch)
(arrow.RecordBatch, error) {
+ defer r.Release()
+
+ return synthesizeRowLineageColumns(ctx, &rowOffset,
taskVal, r)
Review Comment:
I haven't double checked the spec, but should the row lineage columns be
toggleable via a setting? i.e. a way to turn them off if you don't want them to
show up in the results?
##########
table/scanner.go:
##########
@@ -474,12 +474,20 @@ func (scan *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
if err != nil {
return nil, err
}
- results = append(results, FileScanTask{
+ task := FileScanTask{
File: e.DataFile(),
DeleteFiles: deleteFiles,
Start: 0,
Length: e.DataFile().FileSizeBytes(),
- })
+ }
+ // Row lineage constants for v3: readers use these to
synthesize _row_id and _last_updated_sequence_number.
+ if scan.metadata.Version() >= 3 {
+ task.FirstRowID = e.DataFile().FirstRowID()
+ if fseq := e.FileSequenceNum(); fseq != nil {
+ task.DataSequenceNumber = fseq
+ }
+ }
Review Comment:
any reason not to always populate these? at worst we're just setting it to
nil if we're not >= v3
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]