This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 293cbb84 fix(table): use proper data file names (#702)
293cbb84 is described below
commit 293cbb84ec241b062483c4f320de052eb204d1ba
Author: Matt Topol <[email protected]>
AuthorDate: Thu Feb 5 16:29:06 2026 -0500
fix(table): use proper data file names (#702)
fixes #697
Follows the same standard as the java implementation to generate data
file names appropriately.
Adds a test to confirm the name generation.
---------
Co-authored-by: Copilot <[email protected]>
---
internal/utils.go | 7 +-
table/arrow_utils.go | 16 +++--
table/rolling_data_writer.go | 42 +++++++----
table/writer.go | 7 +-
table/writer_test.go | 167 +++++++++++++++++++++++++++++++++++++++++++
5 files changed, 216 insertions(+), 23 deletions(-)
diff --git a/internal/utils.go b/internal/utils.go
index 2cc5027a..9ed69ea4 100644
--- a/internal/utils.go
+++ b/internal/utils.go
@@ -24,6 +24,7 @@ import (
"io"
"iter"
"slices"
+ "sync/atomic"
)
// Helper function to find the difference between two slices (a - b).
@@ -164,12 +165,14 @@ func RecoverError(err *error) {
}
func Counter(start int) iter.Seq[int] {
+ var current atomic.Int64
+ current.Store(int64(start) - 1)
+
return func(yield func(int) bool) {
for {
- if !yield(start) {
+ if !yield(int(current.Add(1))) {
return
}
- start++
}
}
}
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 0dffa74a..35d3b538 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -47,6 +47,8 @@ const (
// this key to identify the field id of the source Parquet field.
// We use this when converting to Iceberg to provide field IDs
ArrowParquetFieldIDKey = "PARQUET:field_id"
+
+ defaultBinPackLookback = 20
)
// ArrowSchemaVisitor is an interface that can be implemented and used to
@@ -1338,13 +1340,17 @@ func recordsToDataFiles(ctx context.Context,
rootLocation string, meta *Metadata
tasks := func(yield func(WriteTask) bool) {
defer stopCount()
- for batch := range binPackRecords(args.itr, 20,
targetFileSize) {
+ fileCount := 0
+ for batch := range binPackRecords(args.itr,
defaultBinPackLookback, targetFileSize) {
cnt, _ := nextCount()
+ fileCount++
t := WriteTask{
- Uuid: *args.writeUUID,
- ID: cnt,
- Schema: taskSchema,
- Batches: batch,
+ Uuid: *args.writeUUID,
+ ID: cnt,
+ PartitionID:
iceberg.UnpartitionedSpec.ID(),
+ FileCount: fileCount,
+ Schema: taskSchema,
+ Batches: batch,
}
if !yield(t) {
return
diff --git a/table/rolling_data_writer.go b/table/rolling_data_writer.go
index ac43d46f..020effdd 100644
--- a/table/rolling_data_writer.go
+++ b/table/rolling_data_writer.go
@@ -33,25 +33,31 @@ import (
// for different partitions, providing shared configuration and coordination
// across all writers in a partitioned write operation.
type writerFactory struct {
- rootLocation string
- args recordWritingArgs
- meta *MetadataBuilder
- taskSchema *iceberg.Schema
- targetFileSize int64
- writers sync.Map
- counter atomic.Int64
- mu sync.Mutex
+ rootLocation string
+ args recordWritingArgs
+ meta *MetadataBuilder
+ taskSchema *iceberg.Schema
+ targetFileSize int64
+ writers sync.Map
+ nextCount func() (int, bool)
+ stopCount func()
+ partitionIDCounter atomic.Int64 // partitionIDCounter generates unique
IDs for partitions
+ mu sync.Mutex
}
// NewWriterFactory creates a new WriterFactory with the specified
configuration
// for managing rolling data writers across partitions.
func NewWriterFactory(rootLocation string, args recordWritingArgs, meta
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64)
writerFactory {
+ nextCount, stopCount := iter.Pull(args.counter)
+
return writerFactory{
rootLocation: rootLocation,
args: args,
meta: meta,
taskSchema: taskSchema,
targetFileSize: targetFileSize,
+ nextCount: nextCount,
+ stopCount: stopCount,
}
}
@@ -60,6 +66,8 @@ func NewWriterFactory(rootLocation string, args
recordWritingArgs, meta *Metadat
// file strategy to manage file sizes.
type RollingDataWriter struct {
partitionKey string
+ partitionID int // unique ID for this partition
+ fileCount atomic.Int64 // counter for files in this partition
recordCh chan arrow.RecordBatch
errorCh chan error
factory *writerFactory
@@ -73,8 +81,10 @@ type RollingDataWriter struct {
// with the given partition values.
func (w *writerFactory) NewRollingDataWriter(ctx context.Context, partition
string, partitionValues map[int]any, outputDataFilesCh chan<- iceberg.DataFile)
*RollingDataWriter {
ctx, cancel := context.WithCancel(ctx)
+ partitionID := int(w.partitionIDCounter.Add(1) - 1)
writer := &RollingDataWriter{
partitionKey: partition,
+ partitionID: partitionID,
recordCh: make(chan arrow.RecordBatch, 64),
errorCh: make(chan error, 1),
factory: w,
@@ -137,7 +147,7 @@ func (r *RollingDataWriter) stream(outputDataFilesCh chan<-
iceberg.DataFile) {
}
}
- binPackedRecords := binPackRecords(recordIter, 20,
r.factory.targetFileSize)
+ binPackedRecords := binPackRecords(recordIter, defaultBinPackLookback,
r.factory.targetFileSize)
for batch := range binPackedRecords {
if err := r.flushToDataFile(batch, outputDataFilesCh); err !=
nil {
select {
@@ -156,13 +166,16 @@ func (r *RollingDataWriter) flushToDataFile(batch
[]arrow.RecordBatch, outputDat
}
task := iter.Seq[WriteTask](func(yield func(WriteTask) bool) {
- cnt := int(r.factory.counter.Add(1) - 1)
+ cnt, _ := r.factory.nextCount()
+ fileCount := int(r.fileCount.Add(1))
yield(WriteTask{
- Uuid: *r.factory.args.writeUUID,
- ID: cnt,
- Schema: r.factory.taskSchema,
- Batches: batch,
+ Uuid: *r.factory.args.writeUUID,
+ ID: cnt,
+ PartitionID: r.partitionID,
+ FileCount: fileCount,
+ Schema: r.factory.taskSchema,
+ Batches: batch,
})
})
@@ -216,6 +229,7 @@ func (r *RollingDataWriter) closeAndWait() error {
}
func (w *writerFactory) closeAll() error {
+ defer w.stopCount()
var writers []*RollingDataWriter
w.writers.Range(func(key, value any) bool {
writer, ok := value.(*RollingDataWriter)
diff --git a/table/writer.go b/table/writer.go
index ef7f7282..8952b6c6 100644
--- a/table/writer.go
+++ b/table/writer.go
@@ -33,6 +33,8 @@ import (
type WriteTask struct {
Uuid uuid.UUID
ID int
+ PartitionID int // PartitionID is the partition identifier used in data
file naming.
+ FileCount int // FileCount is a sequential counter for files written
by this task.
Schema *iceberg.Schema
Batches []arrow.RecordBatch
SortOrderID int
@@ -40,8 +42,9 @@ type WriteTask struct {
func (w WriteTask) GenerateDataFileName(extension string) string {
// Mimics the behavior in the Java API:
- //
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
- return fmt.Sprintf("00000-%d-%s.%s", w.ID, w.Uuid, extension)
+ //
https://github.com/apache/iceberg/blob/03ed4ba9af4e47d32bdb22b7e3d033eb2a4b2c83/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L93
+ // Format:
{partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}.{extension}
+ return fmt.Sprintf("%05d-%d-%s-%05d.%s", w.PartitionID, w.ID, w.Uuid,
w.FileCount, extension)
}
type writer struct {
diff --git a/table/writer_test.go b/table/writer_test.go
new file mode 100644
index 00000000..5701c842
--- /dev/null
+++ b/table/writer_test.go
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestGenerateDataFileName(t *testing.T) {
+ tests := []struct {
+ name string
+ task WriteTask
+ extension string
+ want string
+ }{
+ {
+ name: "unpartitioned table first file",
+ task: WriteTask{
+ Uuid:
uuid.MustParse("12345678-1234-1234-1234-123456789abc"),
+ ID: 0,
+ PartitionID: 0,
+ FileCount: 1,
+ },
+ extension: "parquet",
+ want:
"00000-0-12345678-1234-1234-1234-123456789abc-00001.parquet",
+ },
+ {
+ name: "unpartitioned table multiple files",
+ task: WriteTask{
+ Uuid:
uuid.MustParse("12345678-1234-1234-1234-123456789abc"),
+ ID: 0,
+ PartitionID: 0,
+ FileCount: 42,
+ },
+ extension: "parquet",
+ want:
"00000-0-12345678-1234-1234-1234-123456789abc-00042.parquet",
+ },
+ {
+ name: "partitioned table first partition",
+ task: WriteTask{
+ Uuid:
uuid.MustParse("87654321-4321-4321-4321-cba987654321"),
+ ID: 1,
+ PartitionID: 0,
+ FileCount: 1,
+ },
+ extension: "parquet",
+ want:
"00000-1-87654321-4321-4321-4321-cba987654321-00001.parquet",
+ },
+ {
+ name: "partitioned table second partition",
+ task: WriteTask{
+ Uuid:
uuid.MustParse("87654321-4321-4321-4321-cba987654321"),
+ ID: 2,
+ PartitionID: 1,
+ FileCount: 1,
+ },
+ extension: "parquet",
+ want:
"00001-2-87654321-4321-4321-4321-cba987654321-00001.parquet",
+ },
+ {
+ name: "partitioned table multiple files in partition",
+ task: WriteTask{
+ Uuid:
uuid.MustParse("87654321-4321-4321-4321-cba987654321"),
+ ID: 3,
+ PartitionID: 2,
+ FileCount: 15,
+ },
+ extension: "parquet",
+ want:
"00002-3-87654321-4321-4321-4321-cba987654321-00015.parquet",
+ },
+ {
+ name: "large partition ID",
+ task: WriteTask{
+ Uuid:
uuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"),
+ ID: 100,
+ PartitionID: 12345,
+ FileCount: 99999,
+ },
+ extension: "parquet",
+ want:
"12345-100-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee-99999.parquet",
+ },
+ {
+ name: "more than 5 digits",
+ task: WriteTask{
+ Uuid:
uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"),
+ ID: 123456,
+ PartitionID: 567890,
+ FileCount: 123456,
+ },
+ extension: "parquet",
+ want:
"567890-123456-ffffffff-ffff-ffff-ffff-ffffffffffff-123456.parquet",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := tt.task.GenerateDataFileName(tt.extension)
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
+func TestGenerateDataFileNameFormat(t *testing.T) {
+ // Test that the format matches the Java implementation:
+ // {partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}.{extension}
+
+ task := WriteTask{
+ Uuid:
uuid.MustParse("12345678-1234-1234-1234-123456789abc"),
+ ID: 5,
+ PartitionID: 3,
+ FileCount: 7,
+ }
+
+ filename := task.GenerateDataFileName("parquet")
+
+ // Verify format components
+ require.Contains(t, filename, "00003-", "partition ID should be 5-digit
padded")
+ require.Contains(t, filename, "-5-", "task ID should be present")
+ require.Contains(t, filename, "-12345678-1234-1234-1234-123456789abc-",
"UUID should be present")
+ require.Contains(t, filename, "-00007.parquet", "file count should be
5-digit padded with extension")
+
+ // Verify exact format
+ expected := "00003-5-12345678-1234-1234-1234-123456789abc-00007.parquet"
+ assert.Equal(t, expected, filename)
+}
+
+func TestGenerateDataFileNameUniqueness(t *testing.T) {
+ // Test that different tasks generate different filenames
+ baseUUID := uuid.MustParse("12345678-1234-1234-1234-123456789abc")
+
+ task1 := WriteTask{Uuid: baseUUID, ID: 0, PartitionID: 0, FileCount: 1}
+ task2 := WriteTask{Uuid: baseUUID, ID: 0, PartitionID: 0, FileCount: 2}
+ task3 := WriteTask{Uuid: baseUUID, ID: 1, PartitionID: 0, FileCount: 1}
+ task4 := WriteTask{Uuid: baseUUID, ID: 0, PartitionID: 1, FileCount: 1}
+
+ file1 := task1.GenerateDataFileName("parquet")
+ file2 := task2.GenerateDataFileName("parquet")
+ file3 := task3.GenerateDataFileName("parquet")
+ file4 := task4.GenerateDataFileName("parquet")
+
+ // All filenames should be unique
+ filenames := []string{file1, file2, file3, file4}
+ seen := make(map[string]bool)
+ for _, f := range filenames {
+ assert.False(t, seen[f], "filename %s should be unique", f)
+ seen[f] = true
+ }
+}