zeroshade commented on code in PR #759:
URL: https://github.com/apache/iceberg-go/pull/759#discussion_r2874804960
##########
table/rolling_data_writer.go:
##########
@@ -27,47 +27,193 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ tblutils "github.com/apache/iceberg-go/table/internal"
+ "github.com/google/uuid"
)
-// WriterFactory manages the creation and lifecycle of RollingDataWriter
instances
+// writerFactory manages the creation and lifecycle of RollingDataWriter
instances
// for different partitions, providing shared configuration and coordination
// across all writers in a partitioned write operation.
type writerFactory struct {
- rootLocation string
- args recordWritingArgs
- meta *MetadataBuilder
- taskSchema *iceberg.Schema
- targetFileSize int64
- writers sync.Map
- nextCount func() (int, bool)
- stopCount func()
- partitionIDCounter atomic.Int64 // partitionIDCounter generates unique
IDs for partitions
- mu sync.Mutex
+ rootLocation string
+ rootURL *url.URL
+ fs iceio.WriteFileIO
+ writeUUID *uuid.UUID
+ taskSchema *iceberg.Schema
+ targetFileSize int64
+
+ locProvider LocationProvider
+ fileSchema *iceberg.Schema
+ arrowSchema *arrow.Schema
+ writeProps any
+ statsCols map[int]tblutils.StatisticsCollector
+ currentSpec iceberg.PartitionSpec
+ format tblutils.FileFormat
+ content iceberg.ManifestEntryContent
+
+ writers sync.Map
+ partitionLocProviders sync.Map
+ nextCount func() (int, bool)
+ stopCount func()
+ countMu sync.Mutex
+ partitionIDCounter atomic.Int64
+ mu sync.Mutex
}
-// NewWriterFactory creates a new WriterFactory with the specified
configuration
-// for managing rolling data writerFactory across partitions.
-func NewWriterFactory(rootLocation string, args recordWritingArgs, meta
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64)
writerFactory {
+type writerFactoryOption func(*writerFactory)
+
+func withContentType(content iceberg.ManifestEntryContent) writerFactoryOption
{
+ return func(w *writerFactory) {
+ w.content = content
+ }
+}
+
+func withFactoryFileSchema(schema *iceberg.Schema) writerFactoryOption {
+ return func(w *writerFactory) {
+ w.fileSchema = schema
+ arrowSc, err := SchemaToArrowSchema(schema, nil, true, false)
+ if err != nil {
+ panic(fmt.Sprintf("withFactoryFileSchema: failed to
convert schema: %v", err))
+ }
+ w.arrowSchema = arrowSc
+ }
+}
+
+// newWriterFactory creates a writerFactory with precomputed, invariant write
+// configuration derived from the table metadata.
+func newWriterFactory(rootLocation string, args recordWritingArgs, meta
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64, opts
...writerFactoryOption) (*writerFactory, error) {
nextCount, stopCount := iter.Pull(args.counter)
- return writerFactory{
+ rootURL, err := url.Parse(rootLocation)
+ if err != nil {
+ stopCount()
+
+ return nil, err
+ }
+
+ locProvider, err := LoadLocationProvider(rootLocation, meta.props)
+ if err != nil {
+ stopCount()
+
+ return nil, err
+ }
+
+ fileSchema := meta.CurrentSchema()
+ sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
+ if err != nil {
+ stopCount()
+
+ return nil, err
+ }
+ if !sanitized.Equals(fileSchema) {
+ fileSchema = sanitized
+ }
+
+ format := tblutils.GetFileFormat(iceberg.ParquetFile)
Review Comment:
this shouldn't be hardcoded as `Parquet` should it? Shouldn't we get this
from a table config/write config property?
##########
table/rolling_data_writer.go:
##########
@@ -27,47 +27,193 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ tblutils "github.com/apache/iceberg-go/table/internal"
+ "github.com/google/uuid"
)
-// WriterFactory manages the creation and lifecycle of RollingDataWriter
instances
+// writerFactory manages the creation and lifecycle of RollingDataWriter
instances
// for different partitions, providing shared configuration and coordination
// across all writers in a partitioned write operation.
type writerFactory struct {
- rootLocation string
- args recordWritingArgs
- meta *MetadataBuilder
- taskSchema *iceberg.Schema
- targetFileSize int64
- writers sync.Map
- nextCount func() (int, bool)
- stopCount func()
- partitionIDCounter atomic.Int64 // partitionIDCounter generates unique
IDs for partitions
- mu sync.Mutex
+ rootLocation string
+ rootURL *url.URL
+ fs iceio.WriteFileIO
+ writeUUID *uuid.UUID
+ taskSchema *iceberg.Schema
+ targetFileSize int64
+
+ locProvider LocationProvider
+ fileSchema *iceberg.Schema
+ arrowSchema *arrow.Schema
+ writeProps any
+ statsCols map[int]tblutils.StatisticsCollector
+ currentSpec iceberg.PartitionSpec
+ format tblutils.FileFormat
+ content iceberg.ManifestEntryContent
+
+ writers sync.Map
+ partitionLocProviders sync.Map
+ nextCount func() (int, bool)
+ stopCount func()
+ countMu sync.Mutex
+ partitionIDCounter atomic.Int64
+ mu sync.Mutex
}
-// NewWriterFactory creates a new WriterFactory with the specified
configuration
-// for managing rolling data writerFactory across partitions.
-func NewWriterFactory(rootLocation string, args recordWritingArgs, meta
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64)
writerFactory {
+type writerFactoryOption func(*writerFactory)
+
+func withContentType(content iceberg.ManifestEntryContent) writerFactoryOption
{
+ return func(w *writerFactory) {
+ w.content = content
+ }
+}
+
+func withFactoryFileSchema(schema *iceberg.Schema) writerFactoryOption {
+ return func(w *writerFactory) {
+ w.fileSchema = schema
+ arrowSc, err := SchemaToArrowSchema(schema, nil, true, false)
+ if err != nil {
+ panic(fmt.Sprintf("withFactoryFileSchema: failed to
convert schema: %v", err))
+ }
+ w.arrowSchema = arrowSc
+ }
+}
+
+// newWriterFactory creates a writerFactory with precomputed, invariant write
+// configuration derived from the table metadata.
+func newWriterFactory(rootLocation string, args recordWritingArgs, meta
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64, opts
...writerFactoryOption) (*writerFactory, error) {
nextCount, stopCount := iter.Pull(args.counter)
- return writerFactory{
+ rootURL, err := url.Parse(rootLocation)
+ if err != nil {
+ stopCount()
+
+ return nil, err
+ }
+
+ locProvider, err := LoadLocationProvider(rootLocation, meta.props)
+ if err != nil {
+ stopCount()
+
+ return nil, err
+ }
+
+ fileSchema := meta.CurrentSchema()
+ sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
+ if err != nil {
+ stopCount()
+
+ return nil, err
+ }
+ if !sanitized.Equals(fileSchema) {
+ fileSchema = sanitized
+ }
+
+ format := tblutils.GetFileFormat(iceberg.ParquetFile)
+
+ arrowSchema, err := SchemaToArrowSchema(fileSchema, nil, true, false)
+ if err != nil {
+ stopCount()
+
+ return nil, err
+ }
+
+ currentSpec, err := meta.CurrentSpec()
+ if err != nil || currentSpec == nil {
+ stopCount()
+
+ return nil, fmt.Errorf("%w: cannot write files without a
current spec", err)
+ }
+
+ f := &writerFactory{
rootLocation: rootLocation,
- args: args,
- meta: meta,
+ rootURL: rootURL,
+ fs: args.fs,
+ writeUUID: args.writeUUID,
taskSchema: taskSchema,
targetFileSize: targetFileSize,
+ locProvider: locProvider,
+ fileSchema: fileSchema,
+ arrowSchema: arrowSchema,
+ writeProps: format.GetWriteProperties(meta.props),
+ currentSpec: *currentSpec,
+ format: format,
nextCount: nextCount,
stopCount: stopCount,
}
+ for _, apply := range opts {
+ apply(f)
+ }
+
+ f.statsCols, err = computeStatsPlan(f.fileSchema, meta.props)
+ if err != nil {
+ stopCount()
+
+ return nil, err
+ }
+
+ return f, nil
+}
+
+func (w *writerFactory) openFileWriter(ctx context.Context, partitionPath
string,
+ partitionValues map[int]any, partitionID int, fileCount int,
+) (tblutils.FileWriter, error) {
+ w.countMu.Lock()
+ cnt, _ := w.nextCount()
+ w.countMu.Unlock()
+
+ fileName := WriteTask{
+ Uuid: *w.writeUUID,
+ ID: cnt,
+ PartitionID: partitionID,
+ FileCount: fileCount,
+ }.GenerateDataFileName("parquet")
Review Comment:
same as above, shouldn't we pull `"parquet"` from the config options instead
of hardcoding it?
##########
table/internal/parquet_files.go:
##########
@@ -243,46 +244,105 @@ func (parquetFormat) GetWriteProperties(props
iceberg.Properties) any {
parquet.WithCompressionLevel(compressionLevel))
}
-func (p parquetFormat) WriteDataFile(ctx context.Context, fs
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches
[]arrow.RecordBatch) (_ iceberg.DataFile, err error) {
+func (p parquetFormat) WriteDataFile(ctx context.Context, fs
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches
[]arrow.RecordBatch) (iceberg.DataFile, error) {
+ w, err := p.NewFileWriter(ctx, fs, partitionValues, info,
batches[0].Schema())
+ if err != nil {
+ return nil, err
+ }
+
+ for _, batch := range batches {
+ if err := w.Write(batch); err != nil {
+ w.Close()
+
+ return nil, err
+ }
+ }
+
+ return w.Close()
+}
+
+// ParquetFileWriter is an incremental single-file writer with open/write/close
+// lifecycle. It writes Arrow record batches to a Parquet file and tracks bytes
+// written for rolling file decisions.
+type ParquetFileWriter struct {
+ pqWriter *pqarrow.FileWriter
+ counter *internal.CountingWriter
+ fileCloser io.Closer
+ format parquetFormat
+ info WriteFileInfo
+ partition map[int]any
+ colMapping map[string]int
+}
+
+// NewFileWriter creates a ParquetFileWriter that writes batches to a single
+// Parquet file. Call Write to append batches, BytesWritten to check actual
+// compressed file size, and Close to finalize and get the resulting DataFile.
+func (p parquetFormat) NewFileWriter(ctx context.Context, fs iceio.WriteFileIO,
+ partitionValues map[int]any, info WriteFileInfo, arrowSchema
*arrow.Schema,
+) (FileWriter, error) {
fw, err := fs.Create(info.FileName)
if err != nil {
return nil, err
}
- defer internal.CheckedClose(fw, &err)
+ colMapping, err := p.PathToIDMapping(info.FileSchema)
+ if err != nil {
+ fw.Close()
- cntWriter := internal.CountingWriter{W: fw}
+ return nil, err
+ }
+
+ counter := &internal.CountingWriter{W: fw}
mem := compute.GetAllocator(ctx)
writerProps :=
parquet.NewWriterProperties(info.WriteProps.([]parquet.WriterProperty)...)
arrProps :=
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem),
pqarrow.WithStoreSchema())
- writer, err := pqarrow.NewFileWriter(batches[0].Schema(), &cntWriter,
writerProps, arrProps)
+ writer, err := pqarrow.NewFileWriter(arrowSchema, counter, writerProps,
arrProps)
if err != nil {
+ fw.Close()
+
return nil, err
}
- for _, batch := range batches {
- if err := writer.WriteBuffered(batch); err != nil {
- return nil, err
- }
- }
+ return &ParquetFileWriter{
+ pqWriter: writer,
+ counter: counter,
+ fileCloser: fw,
+ format: p,
+ info: info,
+ partition: partitionValues,
+ colMapping: colMapping,
+ }, nil
+}
- if err := writer.Close(); err != nil {
- return nil, err
- }
+// Write appends a record batch to the Parquet file.
+func (w *ParquetFileWriter) Write(batch arrow.RecordBatch) error {
+ return w.pqWriter.WriteBuffered(batch)
+}
- filemeta, err := writer.FileMetadata()
- if err != nil {
+// BytesWritten returns flushed bytes plus compressed bytes buffered in the
+// current row group — matching the size estimate used by iceberg-java and
+// iceberg-rust to make rolling decisions.
+func (w *ParquetFileWriter) BytesWritten() int64 {
+ return w.counter.Count + w.pqWriter.RowGroupTotalCompressedBytes()
Review Comment:
isn't this going to double count since `RowGroupTotalCompressedBytes` would
also count what has been flushed, not only what's still buffered?
##########
table/rolling_data_writer.go:
##########
@@ -141,70 +284,79 @@ func (r *RollingDataWriter) stream(outputDataFilesCh
chan<- iceberg.DataFile) {
defer r.wg.Done()
defer close(r.errorCh)
- recordIter := func(yield func(arrow.RecordBatch, error) bool) {
- for record := range r.recordCh {
- if !yield(record, nil) {
- return
- }
+ var currentWriter tblutils.FileWriter
+ defer func() {
+ if currentWriter != nil {
+ currentWriter.Close()
}
- }
+ }()
- binPackedRecords := binPackRecords(recordIter, defaultBinPackLookback,
r.factory.targetFileSize)
Review Comment:
why are we dropping the binPacking of the records? I don't see it getting
used elsewhere, Unless the user is explicitly saying that they are providing
sorted records and sorted data, we should allow the binpacking to keep file
sizes down, right? Or is there a reason why we're removing the binpacking?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]