This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 7b2ea9b0 chore: pass writerFactory to newPartitionedFanoutWrite (#743)
7b2ea9b0 is described below
commit 7b2ea9b0393dccccefeffed93095fc44723066f6
Author: Tobias Pütz <[email protected]>
AuthorDate: Fri Feb 20 18:52:18 2026 +0100
chore: pass writerFactory to newPartitionedFanoutWrite (#743)
Just a small cleanup to improve ergonomics, the only two call sites of
newPartitionedFanoutWriter were setting the writers field immediately
after instantiation, this way it's harder to write incorrect code which
forgets to set them.
---
table/arrow_utils.go | 3 +--
table/metadata.go | 6 +++---
table/partitioned_fanout_writer.go | 11 ++++++-----
table/partitioned_fanout_writer_test.go | 4 +---
table/rolling_data_writer.go | 2 +-
table/snapshot_producers_test.go | 12 ++++++------
6 files changed, 18 insertions(+), 20 deletions(-)
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 35d3b538..e599735a 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1360,9 +1360,8 @@ func recordsToDataFiles(ctx context.Context, rootLocation
string, meta *Metadata
return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
} else {
- partitionWriter := newPartitionedFanoutWriter(*currentSpec,
meta.CurrentSchema(), args.itr)
rollingDataWriters := NewWriterFactory(rootLocation, args,
meta, taskSchema, targetFileSize)
- partitionWriter.writers = &rollingDataWriters
+ partitionWriter := newPartitionedFanoutWriter(*currentSpec,
meta.CurrentSchema(), args.itr, &rollingDataWriters)
workers := config.EnvConfig.MaxWorkers
return partitionWriter.Write(ctx, workers)
diff --git a/table/metadata.go b/table/metadata.go
index 4df78e32..2dcdf6c1 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -72,7 +72,7 @@ type Metadata interface {
// table is created. Implementations must throw an exception if a
table's
// UUID does not match the expected UUID after refreshing metadata.
TableUUID() uuid.UUID
- // Location is the table's base location. This is used by writers to
determine
+ // Location is the table's base location. This is used by writerFactory
to determine
// where to store data files, manifest files, and table metadata files.
Location() string
// LastUpdatedMillis is the timestamp in milliseconds from the unix
epoch when
@@ -95,7 +95,7 @@ type Metadata interface {
// PartitionSpecByID returns the partition spec with the given ID.
Returns
// nil if the ID is not found in the list of partition specs.
PartitionSpecByID(int) *iceberg.PartitionSpec
- // DefaultPartitionSpec is the ID of the current spec that writers
should
+ // DefaultPartitionSpec is the ID of the current spec that
writerFactory should
// use by default.
DefaultPartitionSpec() int
// LastPartitionSpecID is the highest assigned partition field ID across
@@ -126,7 +126,7 @@ type Metadata interface {
SortOrder() SortOrder
// SortOrders returns the list of sort orders in the table.
SortOrders() []SortOrder
- // DefaultSortOrder returns the ID of the current sort order that
writers
+ // DefaultSortOrder returns the ID of the current sort order that
writerFactory
// should use by default.
DefaultSortOrder() int
// Properties is a string to string map of table properties. This is
used
diff --git a/table/partitioned_fanout_writer.go
b/table/partitioned_fanout_writer.go
index 838513f5..624c5ad5 100644
--- a/table/partitioned_fanout_writer.go
+++ b/table/partitioned_fanout_writer.go
@@ -39,7 +39,7 @@ type partitionedFanoutWriter struct {
partitionSpec iceberg.PartitionSpec
schema *iceberg.Schema
itr iter.Seq2[arrow.RecordBatch, error]
- writers *writerFactory
+ writerFactory *writerFactory
}
// PartitionInfo holds the row indices and partition values for a specific
partition,
@@ -51,12 +51,13 @@ type partitionInfo struct {
}
// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the
specified
-// partition specification, schema, and record iterator.
-func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema
*iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error])
*partitionedFanoutWriter {
+// partition specification, schema, record iterator, and writerFactory.
+func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema
*iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error], writerFactory
*writerFactory) *partitionedFanoutWriter {
return &partitionedFanoutWriter{
partitionSpec: partitionSpec,
schema: schema,
itr: itr,
+ writerFactory: writerFactory,
}
}
@@ -136,7 +137,7 @@ func (p *partitionedFanoutWriter) fanout(ctx
context.Context, inputRecordsCh <-c
}
partitionPath :=
p.partitionPath(val.partitionRec)
- rollingDataWriter, err :=
p.writers.getOrCreateRollingDataWriter(ctx, partitionPath, val.partitionValues,
dataFilesChannel)
+ rollingDataWriter, err :=
p.writerFactory.getOrCreateRollingDataWriter(ctx, partitionPath,
val.partitionValues, dataFilesChannel)
if err != nil {
return err
}
@@ -157,7 +158,7 @@ func (p *partitionedFanoutWriter)
yieldDataFiles(fanoutWorkers *errgroup.Group,
go func() {
defer close(outputDataFilesCh)
err := fanoutWorkers.Wait()
- err = errors.Join(err, p.writers.closeAll())
+ err = errors.Join(err, p.writerFactory.closeAll())
errCh <- err
close(errCh)
}()
diff --git a/table/partitioned_fanout_writer_test.go
b/table/partitioned_fanout_writer_test.go
index b7e0f6d8..734e31c7 100644
--- a/table/partitioned_fanout_writer_test.go
+++ b/table/partitioned_fanout_writer_test.go
@@ -134,10 +134,8 @@ func (s *FanoutWriterTestSuite)
testTransformPartition(transform iceberg.Transfo
taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
s.Require().NoError(err)
- partitionWriter := newPartitionedFanoutWriter(spec, taskSchema,
args.itr)
rollingDataWriters := NewWriterFactory(loc, args, metaBuilder,
icebergSchema, 1024*1024)
-
- partitionWriter.writers = &rollingDataWriters
+ partitionWriter := newPartitionedFanoutWriter(spec, taskSchema,
args.itr, &rollingDataWriters)
workers := config.EnvConfig.MaxWorkers
dataFiles := partitionWriter.Write(s.ctx, workers)
diff --git a/table/rolling_data_writer.go b/table/rolling_data_writer.go
index 020effdd..35d42ebc 100644
--- a/table/rolling_data_writer.go
+++ b/table/rolling_data_writer.go
@@ -46,7 +46,7 @@ type writerFactory struct {
}
// NewWriterFactory creates a new WriterFactory with the specified
configuration
-// for managing rolling data writers across partitions.
+// for managing rolling data writerFactory across partitions.
func NewWriterFactory(rootLocation string, args recordWritingArgs, meta
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64)
writerFactory {
nextCount, stopCount := iter.Pull(args.counter)
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index 025e0b59..9c008886 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -606,7 +606,7 @@ func TestManifestWriterClosesUnderlyingFile(t *testing.T) {
require.Len(t, manifests, 1, "should have one manifest")
unclosed := trackIO.GetUnclosedWriters()
- require.Empty(t, unclosed, "all file writers should be closed, but
these are still open: %v", unclosed)
+ require.Empty(t, unclosed, "all file writerFactory should be closed,
but these are still open: %v", unclosed)
}
// TestCreateManifestClosesUnderlyingFile tests that createManifest properly
@@ -636,7 +636,7 @@ func TestCreateManifestClosesUnderlyingFile(t *testing.T) {
require.NoError(t, err, "createManifest should succeed")
unclosed := trackIO.GetUnclosedWriters()
- require.Empty(t, unclosed, "all file writers should be closed after
createManifest, but these are still open: %v", unclosed)
+ require.Empty(t, unclosed, "all file writerFactory should be closed
after createManifest, but these are still open: %v", unclosed)
}
// TestOverwriteExistingManifestsClosesUnderlyingFile tests that
existingManifests
@@ -688,11 +688,11 @@ func TestOverwriteExistingManifestsClosesUnderlyingFile(t
*testing.T) {
require.NoError(t, err, "existingManifests should succeed")
unclosed := trackIO.GetUnclosedWriters()
- require.Empty(t, unclosed, "all file writers should be closed after
existingManifests, but these are still open: %v", unclosed)
+ require.Empty(t, unclosed, "all file writerFactory should be closed
after existingManifests, but these are still open: %v", unclosed)
}
// errorOnDeletedEntries is a producerImpl that returns an error from
deletedEntries()
-// to test that file writers are properly closed even when deletedEntries
fails.
+// to test that file writerFactory are properly closed even when
deletedEntries fails.
type errorOnDeletedEntries struct {
base *snapshotProducer
err error
@@ -743,7 +743,7 @@ func (b *blockingTrackingIO) Create(name string)
(iceio.FileWriter, error) {
return writer, err
}
-// This test verifies that NO writers are created when deletedEntries() fails,
+// This test verifies that NO writerFactory are created when deletedEntries()
fails,
// because the error should be returned before any goroutines start.
func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
@@ -781,6 +781,6 @@ func TestManifestsClosesWriterWhenDeletedEntriesFails(t
*testing.T) {
case <-time.After(100 * time.Millisecond):
writerCount := blockingIO.GetWriterCount()
- require.Zero(t, writerCount, "expected no writers to be created
when deletedEntries is called first")
+ require.Zero(t, writerCount, "expected no writerFactory to be
created when deletedEntries is called first")
}
}