This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 293cbb84 fix(table): use proper data file names (#702)
293cbb84 is described below

commit 293cbb84ec241b062483c4f320de052eb204d1ba
Author: Matt Topol <[email protected]>
AuthorDate: Thu Feb 5 16:29:06 2026 -0500

    fix(table): use proper data file names (#702)
    
    fixes #697
    
    Follows the same standard as the java implementation to generate data
    file names appropriately.
    
    Adds a test to confirm the name generation.
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 internal/utils.go            |   7 +-
 table/arrow_utils.go         |  16 +++--
 table/rolling_data_writer.go |  42 +++++++----
 table/writer.go              |   7 +-
 table/writer_test.go         | 167 +++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 216 insertions(+), 23 deletions(-)

diff --git a/internal/utils.go b/internal/utils.go
index 2cc5027a..9ed69ea4 100644
--- a/internal/utils.go
+++ b/internal/utils.go
@@ -24,6 +24,7 @@ import (
        "io"
        "iter"
        "slices"
+       "sync/atomic"
 )
 
 // Helper function to find the difference between two slices (a - b).
@@ -164,12 +165,14 @@ func RecoverError(err *error) {
 }
 
 func Counter(start int) iter.Seq[int] {
+       var current atomic.Int64
+       current.Store(int64(start) - 1)
+
        return func(yield func(int) bool) {
                for {
-                       if !yield(start) {
+                       if !yield(int(current.Add(1))) {
                                return
                        }
-                       start++
                }
        }
 }
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 0dffa74a..35d3b538 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -47,6 +47,8 @@ const (
        // this key to identify the field id of the source Parquet field.
        // We use this when converting to Iceberg to provide field IDs
        ArrowParquetFieldIDKey = "PARQUET:field_id"
+
+       defaultBinPackLookback = 20
 )
 
 // ArrowSchemaVisitor is an interface that can be implemented and used to
@@ -1338,13 +1340,17 @@ func recordsToDataFiles(ctx context.Context, 
rootLocation string, meta *Metadata
                tasks := func(yield func(WriteTask) bool) {
                        defer stopCount()
 
-                       for batch := range binPackRecords(args.itr, 20, 
targetFileSize) {
+                       fileCount := 0
+                       for batch := range binPackRecords(args.itr, 
defaultBinPackLookback, targetFileSize) {
                                cnt, _ := nextCount()
+                               fileCount++
                                t := WriteTask{
-                                       Uuid:    *args.writeUUID,
-                                       ID:      cnt,
-                                       Schema:  taskSchema,
-                                       Batches: batch,
+                                       Uuid:        *args.writeUUID,
+                                       ID:          cnt,
+                                       PartitionID: 
iceberg.UnpartitionedSpec.ID(),
+                                       FileCount:   fileCount,
+                                       Schema:      taskSchema,
+                                       Batches:     batch,
                                }
                                if !yield(t) {
                                        return
diff --git a/table/rolling_data_writer.go b/table/rolling_data_writer.go
index ac43d46f..020effdd 100644
--- a/table/rolling_data_writer.go
+++ b/table/rolling_data_writer.go
@@ -33,25 +33,31 @@ import (
 // for different partitions, providing shared configuration and coordination
 // across all writers in a partitioned write operation.
 type writerFactory struct {
-       rootLocation   string
-       args           recordWritingArgs
-       meta           *MetadataBuilder
-       taskSchema     *iceberg.Schema
-       targetFileSize int64
-       writers        sync.Map
-       counter        atomic.Int64
-       mu             sync.Mutex
+       rootLocation       string
+       args               recordWritingArgs
+       meta               *MetadataBuilder
+       taskSchema         *iceberg.Schema
+       targetFileSize     int64
+       writers            sync.Map
+       nextCount          func() (int, bool)
+       stopCount          func()
+       partitionIDCounter atomic.Int64 // partitionIDCounter generates unique 
IDs for partitions
+       mu                 sync.Mutex
 }
 
 // NewWriterFactory creates a new WriterFactory with the specified 
configuration
 // for managing rolling data writers across partitions.
 func NewWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) 
writerFactory {
+       nextCount, stopCount := iter.Pull(args.counter)
+
        return writerFactory{
                rootLocation:   rootLocation,
                args:           args,
                meta:           meta,
                taskSchema:     taskSchema,
                targetFileSize: targetFileSize,
+               nextCount:      nextCount,
+               stopCount:      stopCount,
        }
 }
 
@@ -60,6 +66,8 @@ func NewWriterFactory(rootLocation string, args 
recordWritingArgs, meta *Metadat
 // file strategy to manage file sizes.
 type RollingDataWriter struct {
        partitionKey    string
+       partitionID     int          // unique ID for this partition
+       fileCount       atomic.Int64 // counter for files in this partition
        recordCh        chan arrow.RecordBatch
        errorCh         chan error
        factory         *writerFactory
@@ -73,8 +81,10 @@ type RollingDataWriter struct {
 // with the given partition values.
 func (w *writerFactory) NewRollingDataWriter(ctx context.Context, partition 
string, partitionValues map[int]any, outputDataFilesCh chan<- iceberg.DataFile) 
*RollingDataWriter {
        ctx, cancel := context.WithCancel(ctx)
+       partitionID := int(w.partitionIDCounter.Add(1) - 1)
        writer := &RollingDataWriter{
                partitionKey:    partition,
+               partitionID:     partitionID,
                recordCh:        make(chan arrow.RecordBatch, 64),
                errorCh:         make(chan error, 1),
                factory:         w,
@@ -137,7 +147,7 @@ func (r *RollingDataWriter) stream(outputDataFilesCh chan<- 
iceberg.DataFile) {
                }
        }
 
-       binPackedRecords := binPackRecords(recordIter, 20, 
r.factory.targetFileSize)
+       binPackedRecords := binPackRecords(recordIter, defaultBinPackLookback, 
r.factory.targetFileSize)
        for batch := range binPackedRecords {
                if err := r.flushToDataFile(batch, outputDataFilesCh); err != 
nil {
                        select {
@@ -156,13 +166,16 @@ func (r *RollingDataWriter) flushToDataFile(batch 
[]arrow.RecordBatch, outputDat
        }
 
        task := iter.Seq[WriteTask](func(yield func(WriteTask) bool) {
-               cnt := int(r.factory.counter.Add(1) - 1)
+               cnt, _ := r.factory.nextCount()
+               fileCount := int(r.fileCount.Add(1))
 
                yield(WriteTask{
-                       Uuid:    *r.factory.args.writeUUID,
-                       ID:      cnt,
-                       Schema:  r.factory.taskSchema,
-                       Batches: batch,
+                       Uuid:        *r.factory.args.writeUUID,
+                       ID:          cnt,
+                       PartitionID: r.partitionID,
+                       FileCount:   fileCount,
+                       Schema:      r.factory.taskSchema,
+                       Batches:     batch,
                })
        })
 
@@ -216,6 +229,7 @@ func (r *RollingDataWriter) closeAndWait() error {
 }
 
 func (w *writerFactory) closeAll() error {
+       defer w.stopCount()
        var writers []*RollingDataWriter
        w.writers.Range(func(key, value any) bool {
                writer, ok := value.(*RollingDataWriter)
diff --git a/table/writer.go b/table/writer.go
index ef7f7282..8952b6c6 100644
--- a/table/writer.go
+++ b/table/writer.go
@@ -33,6 +33,8 @@ import (
 type WriteTask struct {
        Uuid        uuid.UUID
        ID          int
+       PartitionID int // PartitionID is the partition identifier used in data 
file naming.
+       FileCount   int // FileCount is a sequential counter for files written 
by this task.
        Schema      *iceberg.Schema
        Batches     []arrow.RecordBatch
        SortOrderID int
@@ -40,8 +42,9 @@ type WriteTask struct {
 
 func (w WriteTask) GenerateDataFileName(extension string) string {
        // Mimics the behavior in the Java API:
-       // 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
-       return fmt.Sprintf("00000-%d-%s.%s", w.ID, w.Uuid, extension)
+       // 
https://github.com/apache/iceberg/blob/03ed4ba9af4e47d32bdb22b7e3d033eb2a4b2c83/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L93
+       // Format: 
{partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}.{extension}
+       return fmt.Sprintf("%05d-%d-%s-%05d.%s", w.PartitionID, w.ID, w.Uuid, 
w.FileCount, extension)
 }
 
 type writer struct {
diff --git a/table/writer_test.go b/table/writer_test.go
new file mode 100644
index 00000000..5701c842
--- /dev/null
+++ b/table/writer_test.go
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "testing"
+
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestGenerateDataFileName(t *testing.T) {
+       tests := []struct {
+               name      string
+               task      WriteTask
+               extension string
+               want      string
+       }{
+               {
+                       name: "unpartitioned table first file",
+                       task: WriteTask{
+                               Uuid:        
uuid.MustParse("12345678-1234-1234-1234-123456789abc"),
+                               ID:          0,
+                               PartitionID: 0,
+                               FileCount:   1,
+                       },
+                       extension: "parquet",
+                       want:      
"00000-0-12345678-1234-1234-1234-123456789abc-00001.parquet",
+               },
+               {
+                       name: "unpartitioned table multiple files",
+                       task: WriteTask{
+                               Uuid:        
uuid.MustParse("12345678-1234-1234-1234-123456789abc"),
+                               ID:          0,
+                               PartitionID: 0,
+                               FileCount:   42,
+                       },
+                       extension: "parquet",
+                       want:      
"00000-0-12345678-1234-1234-1234-123456789abc-00042.parquet",
+               },
+               {
+                       name: "partitioned table first partition",
+                       task: WriteTask{
+                               Uuid:        
uuid.MustParse("87654321-4321-4321-4321-cba987654321"),
+                               ID:          1,
+                               PartitionID: 0,
+                               FileCount:   1,
+                       },
+                       extension: "parquet",
+                       want:      
"00000-1-87654321-4321-4321-4321-cba987654321-00001.parquet",
+               },
+               {
+                       name: "partitioned table second partition",
+                       task: WriteTask{
+                               Uuid:        
uuid.MustParse("87654321-4321-4321-4321-cba987654321"),
+                               ID:          2,
+                               PartitionID: 1,
+                               FileCount:   1,
+                       },
+                       extension: "parquet",
+                       want:      
"00001-2-87654321-4321-4321-4321-cba987654321-00001.parquet",
+               },
+               {
+                       name: "partitioned table multiple files in partition",
+                       task: WriteTask{
+                               Uuid:        
uuid.MustParse("87654321-4321-4321-4321-cba987654321"),
+                               ID:          3,
+                               PartitionID: 2,
+                               FileCount:   15,
+                       },
+                       extension: "parquet",
+                       want:      
"00002-3-87654321-4321-4321-4321-cba987654321-00015.parquet",
+               },
+               {
+                       name: "large partition ID",
+                       task: WriteTask{
+                               Uuid:        
uuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"),
+                               ID:          100,
+                               PartitionID: 12345,
+                               FileCount:   99999,
+                       },
+                       extension: "parquet",
+                       want:      
"12345-100-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee-99999.parquet",
+               },
+               {
+                       name: "more than 5 digits",
+                       task: WriteTask{
+                               Uuid:        
uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"),
+                               ID:          123456,
+                               PartitionID: 567890,
+                               FileCount:   123456,
+                       },
+                       extension: "parquet",
+                       want:      
"567890-123456-ffffffff-ffff-ffff-ffff-ffffffffffff-123456.parquet",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got := tt.task.GenerateDataFileName(tt.extension)
+                       assert.Equal(t, tt.want, got)
+               })
+       }
+}
+
+func TestGenerateDataFileNameFormat(t *testing.T) {
+       // Test that the format matches the Java implementation:
+       // {partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}.{extension}
+
+       task := WriteTask{
+               Uuid:        
uuid.MustParse("12345678-1234-1234-1234-123456789abc"),
+               ID:          5,
+               PartitionID: 3,
+               FileCount:   7,
+       }
+
+       filename := task.GenerateDataFileName("parquet")
+
+       // Verify format components
+       require.Contains(t, filename, "00003-", "partition ID should be 5-digit 
padded")
+       require.Contains(t, filename, "-5-", "task ID should be present")
+       require.Contains(t, filename, "-12345678-1234-1234-1234-123456789abc-", 
"UUID should be present")
+       require.Contains(t, filename, "-00007.parquet", "file count should be 
5-digit padded with extension")
+
+       // Verify exact format
+       expected := "00003-5-12345678-1234-1234-1234-123456789abc-00007.parquet"
+       assert.Equal(t, expected, filename)
+}
+
+func TestGenerateDataFileNameUniqueness(t *testing.T) {
+       // Test that different tasks generate different filenames
+       baseUUID := uuid.MustParse("12345678-1234-1234-1234-123456789abc")
+
+       task1 := WriteTask{Uuid: baseUUID, ID: 0, PartitionID: 0, FileCount: 1}
+       task2 := WriteTask{Uuid: baseUUID, ID: 0, PartitionID: 0, FileCount: 2}
+       task3 := WriteTask{Uuid: baseUUID, ID: 1, PartitionID: 0, FileCount: 1}
+       task4 := WriteTask{Uuid: baseUUID, ID: 0, PartitionID: 1, FileCount: 1}
+
+       file1 := task1.GenerateDataFileName("parquet")
+       file2 := task2.GenerateDataFileName("parquet")
+       file3 := task3.GenerateDataFileName("parquet")
+       file4 := task4.GenerateDataFileName("parquet")
+
+       // All filenames should be unique
+       filenames := []string{file1, file2, file3, file4}
+       seen := make(map[string]bool)
+       for _, f := range filenames {
+               assert.False(t, seen[f], "filename %s should be unique", f)
+               seen[f] = true
+       }
+}

Reply via email to