zeroshade commented on code in PR #759:
URL: https://github.com/apache/iceberg-go/pull/759#discussion_r2874804960


##########
table/rolling_data_writer.go:
##########
@@ -27,47 +27,193 @@ import (
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       tblutils "github.com/apache/iceberg-go/table/internal"
+       "github.com/google/uuid"
 )
 
-// WriterFactory manages the creation and lifecycle of RollingDataWriter 
instances
+// writerFactory manages the creation and lifecycle of RollingDataWriter 
instances
 // for different partitions, providing shared configuration and coordination
 // across all writers in a partitioned write operation.
 type writerFactory struct {
-       rootLocation       string
-       args               recordWritingArgs
-       meta               *MetadataBuilder
-       taskSchema         *iceberg.Schema
-       targetFileSize     int64
-       writers            sync.Map
-       nextCount          func() (int, bool)
-       stopCount          func()
-       partitionIDCounter atomic.Int64 // partitionIDCounter generates unique 
IDs for partitions
-       mu                 sync.Mutex
+       rootLocation   string
+       rootURL        *url.URL
+       fs             iceio.WriteFileIO
+       writeUUID      *uuid.UUID
+       taskSchema     *iceberg.Schema
+       targetFileSize int64
+
+       locProvider LocationProvider
+       fileSchema  *iceberg.Schema
+       arrowSchema *arrow.Schema
+       writeProps  any
+       statsCols   map[int]tblutils.StatisticsCollector
+       currentSpec iceberg.PartitionSpec
+       format      tblutils.FileFormat
+       content     iceberg.ManifestEntryContent
+
+       writers               sync.Map
+       partitionLocProviders sync.Map
+       nextCount             func() (int, bool)
+       stopCount             func()
+       countMu               sync.Mutex
+       partitionIDCounter    atomic.Int64
+       mu                    sync.Mutex
 }
 
-// NewWriterFactory creates a new WriterFactory with the specified 
configuration
-// for managing rolling data writerFactory across partitions.
-func NewWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) 
writerFactory {
+type writerFactoryOption func(*writerFactory)
+
+func withContentType(content iceberg.ManifestEntryContent) writerFactoryOption 
{
+       return func(w *writerFactory) {
+               w.content = content
+       }
+}
+
+func withFactoryFileSchema(schema *iceberg.Schema) writerFactoryOption {
+       return func(w *writerFactory) {
+               w.fileSchema = schema
+               arrowSc, err := SchemaToArrowSchema(schema, nil, true, false)
+               if err != nil {
+                       panic(fmt.Sprintf("withFactoryFileSchema: failed to 
convert schema: %v", err))
+               }
+               w.arrowSchema = arrowSc
+       }
+}
+
+// newWriterFactory creates a writerFactory with precomputed, invariant write
+// configuration derived from the table metadata.
+func newWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64, opts 
...writerFactoryOption) (*writerFactory, error) {
        nextCount, stopCount := iter.Pull(args.counter)
 
-       return writerFactory{
+       rootURL, err := url.Parse(rootLocation)
+       if err != nil {
+               stopCount()
+
+               return nil, err
+       }
+
+       locProvider, err := LoadLocationProvider(rootLocation, meta.props)
+       if err != nil {
+               stopCount()
+
+               return nil, err
+       }
+
+       fileSchema := meta.CurrentSchema()
+       sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
+       if err != nil {
+               stopCount()
+
+               return nil, err
+       }
+       if !sanitized.Equals(fileSchema) {
+               fileSchema = sanitized
+       }
+
+       format := tblutils.GetFileFormat(iceberg.ParquetFile)

Review Comment:
   this shouldn't be hardcoded as `Parquet` should it? Shouldn't we get this 
from a table config/write config property?



##########
table/rolling_data_writer.go:
##########
@@ -27,47 +27,193 @@ import (
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       tblutils "github.com/apache/iceberg-go/table/internal"
+       "github.com/google/uuid"
 )
 
-// WriterFactory manages the creation and lifecycle of RollingDataWriter 
instances
+// writerFactory manages the creation and lifecycle of RollingDataWriter 
instances
 // for different partitions, providing shared configuration and coordination
 // across all writers in a partitioned write operation.
 type writerFactory struct {
-       rootLocation       string
-       args               recordWritingArgs
-       meta               *MetadataBuilder
-       taskSchema         *iceberg.Schema
-       targetFileSize     int64
-       writers            sync.Map
-       nextCount          func() (int, bool)
-       stopCount          func()
-       partitionIDCounter atomic.Int64 // partitionIDCounter generates unique 
IDs for partitions
-       mu                 sync.Mutex
+       rootLocation   string
+       rootURL        *url.URL
+       fs             iceio.WriteFileIO
+       writeUUID      *uuid.UUID
+       taskSchema     *iceberg.Schema
+       targetFileSize int64
+
+       locProvider LocationProvider
+       fileSchema  *iceberg.Schema
+       arrowSchema *arrow.Schema
+       writeProps  any
+       statsCols   map[int]tblutils.StatisticsCollector
+       currentSpec iceberg.PartitionSpec
+       format      tblutils.FileFormat
+       content     iceberg.ManifestEntryContent
+
+       writers               sync.Map
+       partitionLocProviders sync.Map
+       nextCount             func() (int, bool)
+       stopCount             func()
+       countMu               sync.Mutex
+       partitionIDCounter    atomic.Int64
+       mu                    sync.Mutex
 }
 
-// NewWriterFactory creates a new WriterFactory with the specified 
configuration
-// for managing rolling data writerFactory across partitions.
-func NewWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) 
writerFactory {
+type writerFactoryOption func(*writerFactory)
+
+func withContentType(content iceberg.ManifestEntryContent) writerFactoryOption 
{
+       return func(w *writerFactory) {
+               w.content = content
+       }
+}
+
+func withFactoryFileSchema(schema *iceberg.Schema) writerFactoryOption {
+       return func(w *writerFactory) {
+               w.fileSchema = schema
+               arrowSc, err := SchemaToArrowSchema(schema, nil, true, false)
+               if err != nil {
+                       panic(fmt.Sprintf("withFactoryFileSchema: failed to 
convert schema: %v", err))
+               }
+               w.arrowSchema = arrowSc
+       }
+}
+
+// newWriterFactory creates a writerFactory with precomputed, invariant write
+// configuration derived from the table metadata.
+func newWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64, opts 
...writerFactoryOption) (*writerFactory, error) {
        nextCount, stopCount := iter.Pull(args.counter)
 
-       return writerFactory{
+       rootURL, err := url.Parse(rootLocation)
+       if err != nil {
+               stopCount()
+
+               return nil, err
+       }
+
+       locProvider, err := LoadLocationProvider(rootLocation, meta.props)
+       if err != nil {
+               stopCount()
+
+               return nil, err
+       }
+
+       fileSchema := meta.CurrentSchema()
+       sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
+       if err != nil {
+               stopCount()
+
+               return nil, err
+       }
+       if !sanitized.Equals(fileSchema) {
+               fileSchema = sanitized
+       }
+
+       format := tblutils.GetFileFormat(iceberg.ParquetFile)
+
+       arrowSchema, err := SchemaToArrowSchema(fileSchema, nil, true, false)
+       if err != nil {
+               stopCount()
+
+               return nil, err
+       }
+
+       currentSpec, err := meta.CurrentSpec()
+       if err != nil || currentSpec == nil {
+               stopCount()
+
+               return nil, fmt.Errorf("%w: cannot write files without a 
current spec", err)
+       }
+
+       f := &writerFactory{
                rootLocation:   rootLocation,
-               args:           args,
-               meta:           meta,
+               rootURL:        rootURL,
+               fs:             args.fs,
+               writeUUID:      args.writeUUID,
                taskSchema:     taskSchema,
                targetFileSize: targetFileSize,
+               locProvider:    locProvider,
+               fileSchema:     fileSchema,
+               arrowSchema:    arrowSchema,
+               writeProps:     format.GetWriteProperties(meta.props),
+               currentSpec:    *currentSpec,
+               format:         format,
                nextCount:      nextCount,
                stopCount:      stopCount,
        }
+       for _, apply := range opts {
+               apply(f)
+       }
+
+       f.statsCols, err = computeStatsPlan(f.fileSchema, meta.props)
+       if err != nil {
+               stopCount()
+
+               return nil, err
+       }
+
+       return f, nil
+}
+
+func (w *writerFactory) openFileWriter(ctx context.Context, partitionPath 
string,
+       partitionValues map[int]any, partitionID int, fileCount int,
+) (tblutils.FileWriter, error) {
+       w.countMu.Lock()
+       cnt, _ := w.nextCount()
+       w.countMu.Unlock()
+
+       fileName := WriteTask{
+               Uuid:        *w.writeUUID,
+               ID:          cnt,
+               PartitionID: partitionID,
+               FileCount:   fileCount,
+       }.GenerateDataFileName("parquet")

Review Comment:
   same as above, shouldn't we pull `"parquet"` from the config options instead 
of hardcoding it?



##########
table/internal/parquet_files.go:
##########
@@ -243,46 +244,105 @@ func (parquetFormat) GetWriteProperties(props 
iceberg.Properties) any {
                parquet.WithCompressionLevel(compressionLevel))
 }
 
-func (p parquetFormat) WriteDataFile(ctx context.Context, fs 
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches 
[]arrow.RecordBatch) (_ iceberg.DataFile, err error) {
+func (p parquetFormat) WriteDataFile(ctx context.Context, fs 
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches 
[]arrow.RecordBatch) (iceberg.DataFile, error) {
+       w, err := p.NewFileWriter(ctx, fs, partitionValues, info, 
batches[0].Schema())
+       if err != nil {
+               return nil, err
+       }
+
+       for _, batch := range batches {
+               if err := w.Write(batch); err != nil {
+                       w.Close()
+
+                       return nil, err
+               }
+       }
+
+       return w.Close()
+}
+
+// ParquetFileWriter is an incremental single-file writer with open/write/close
+// lifecycle. It writes Arrow record batches to a Parquet file and tracks bytes
+// written for rolling file decisions.
+type ParquetFileWriter struct {
+       pqWriter   *pqarrow.FileWriter
+       counter    *internal.CountingWriter
+       fileCloser io.Closer
+       format     parquetFormat
+       info       WriteFileInfo
+       partition  map[int]any
+       colMapping map[string]int
+}
+
+// NewFileWriter creates a ParquetFileWriter that writes batches to a single
+// Parquet file. Call Write to append batches, BytesWritten to check actual
+// compressed file size, and Close to finalize and get the resulting DataFile.
+func (p parquetFormat) NewFileWriter(ctx context.Context, fs iceio.WriteFileIO,
+       partitionValues map[int]any, info WriteFileInfo, arrowSchema 
*arrow.Schema,
+) (FileWriter, error) {
        fw, err := fs.Create(info.FileName)
        if err != nil {
                return nil, err
        }
 
-       defer internal.CheckedClose(fw, &err)
+       colMapping, err := p.PathToIDMapping(info.FileSchema)
+       if err != nil {
+               fw.Close()
 
-       cntWriter := internal.CountingWriter{W: fw}
+               return nil, err
+       }
+
+       counter := &internal.CountingWriter{W: fw}
        mem := compute.GetAllocator(ctx)
        writerProps := 
parquet.NewWriterProperties(info.WriteProps.([]parquet.WriterProperty)...)
        arrProps := 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem), 
pqarrow.WithStoreSchema())
 
-       writer, err := pqarrow.NewFileWriter(batches[0].Schema(), &cntWriter, 
writerProps, arrProps)
+       writer, err := pqarrow.NewFileWriter(arrowSchema, counter, writerProps, 
arrProps)
        if err != nil {
+               fw.Close()
+
                return nil, err
        }
 
-       for _, batch := range batches {
-               if err := writer.WriteBuffered(batch); err != nil {
-                       return nil, err
-               }
-       }
+       return &ParquetFileWriter{
+               pqWriter:   writer,
+               counter:    counter,
+               fileCloser: fw,
+               format:     p,
+               info:       info,
+               partition:  partitionValues,
+               colMapping: colMapping,
+       }, nil
+}
 
-       if err := writer.Close(); err != nil {
-               return nil, err
-       }
+// Write appends a record batch to the Parquet file.
+func (w *ParquetFileWriter) Write(batch arrow.RecordBatch) error {
+       return w.pqWriter.WriteBuffered(batch)
+}
 
-       filemeta, err := writer.FileMetadata()
-       if err != nil {
+// BytesWritten returns flushed bytes plus compressed bytes buffered in the
+// current row group — matching the size estimate used by iceberg-java and
+// iceberg-rust to make rolling decisions.
+func (w *ParquetFileWriter) BytesWritten() int64 {
+       return w.counter.Count + w.pqWriter.RowGroupTotalCompressedBytes()

Review Comment:
   isn't this going to double count since `RowGroupTotalCompressedBytes` would 
also count what has been flushed, not only what's still buffered?



##########
table/rolling_data_writer.go:
##########
@@ -141,70 +284,79 @@ func (r *RollingDataWriter) stream(outputDataFilesCh 
chan<- iceberg.DataFile) {
        defer r.wg.Done()
        defer close(r.errorCh)
 
-       recordIter := func(yield func(arrow.RecordBatch, error) bool) {
-               for record := range r.recordCh {
-                       if !yield(record, nil) {
-                               return
-                       }
+       var currentWriter tblutils.FileWriter
+       defer func() {
+               if currentWriter != nil {
+                       currentWriter.Close()
                }
-       }
+       }()
 
-       binPackedRecords := binPackRecords(recordIter, defaultBinPackLookback, 
r.factory.targetFileSize)

Review Comment:
   why are we dropping the binPacking of the records? I don't see it getting 
used elsewhere, Unless the user is explicitly saying that they are providing 
sorted records and sorted data, we should allow the binpacking to keep file 
sizes down, right? Or is there a reason why we're removing the binpacking?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to