This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new a5765060 feat: add functions for add and replacing data directly with 
datafiles (#723)
a5765060 is described below

commit a57650605af2d2aaf50a3a93bb2647794769c998
Author: Adam Gaddis <[email protected]>
AuthorDate: Thu Feb 19 13:43:03 2026 -0600

    feat: add functions for add and replacing data directly with datafiles 
(#723)
    
    # Context
    
    If you want to write your own parquet files and only use iceberg to
    handle the metadata, you are only left with the option (for the most
    part) of leveraging the `ReplaceDataFiles` function.
    
    This function takes in a list of existing files and a list of new file
    paths to override that previous data with.
    
    This function works fine for the most part, but the function includes a
    scan in it which means it's not actually taking your word that your new
    parquet files match the table schema.
    
    This scan proves to be problematic in some cases when you are writing
    files very fast and leveraging multipart uploads. You know the location
    of all files, know they are valid parquet files, but the commit has the
    possibility to return an error because at the time of commit the file
    might not be fully available.
    
    the error looks something like this at commit time: `failed to replace
    data files: error encountered during file conversion: parquet: could not
    read 8 bytes from end of file`.
    
    # Solution
    We have tested this out in vendor code and opened a fork that adds a new
    function.
    
    `ReplaceDataFiles` is scanning your file paths to try and ensure the
    schema of said files match the schema of the table you are inputting
    them into.
    
    We, and I would assume a lot of people writing their own parquet files,
    don't need this. Our ingestion framework guarantees we will never get a
    incorrect parquet file, and we also have access to our Parquet Schema
    and Arrow Schema for the entirety of the ingestion.
    
    So I can build data files directly and would much rather just pass my
    own datafiles to this function, as I know the files will eventually be
    available and they will be correct. all this is doing is telling the
    metadata where to look at said file, there is no real harm in committing
    before that file is actually available unless you are querying it right
    away and it happens to not be available.
    
    This also speeds up the commit time tremendously as this library doesn't
    need to go through scan all of the files for every single commit.
    
    Co-authored-by: Adam Gaddis <[email protected]>
---
 table/table_test.go  | 509 +++++++++++++++++++++++++++++++++++++++++++++++++++
 table/transaction.go | 248 +++++++++++++++++++++++++
 2 files changed, 757 insertions(+)

diff --git a/table/table_test.go b/table/table_test.go
index ae1bbdde..7f7e0e57 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -71,6 +71,22 @@ func mustFS(t *testing.T, tbl *table.Table) iceio.IO {
        return r
 }
 
+// mustFileSize returns the file size for a test path and fails the test on 
error.
+func mustFileSize(t *testing.T, path string) int64 {
+       info, err := os.Stat(path)
+       require.NoError(t, err)
+
+       return info.Size()
+}
+
+// mustDataFile builds a test DataFile and fails the test if construction 
fails.
+func mustDataFile(t *testing.T, spec iceberg.PartitionSpec, path string, 
partition map[int]any, count, size int64) iceberg.DataFile {
+       builder, err := iceberg.NewDataFileBuilder(spec, 
iceberg.EntryContentData, path, iceberg.ParquetFile, partition, nil, nil, 
count, size)
+       require.NoError(t, err)
+
+       return builder.Build()
+}
+
 func (t *TableTestSuite) SetupSuite() {
        var mockfs internal.MockFS
        mockfs.Test(t.T())
@@ -922,6 +938,499 @@ func (t *TableWritingTestSuite) TestReplaceDataFiles() {
        }, staged.CurrentSnapshot().Summary)
 }
 
+func (t *TableWritingTestSuite) TestAddDataFiles() {
+       ident := table.Identifier{"default", "add_data_files_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       filePath := fmt.Sprintf("%s/add_data_files_v%d/test.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), filePath, 
t.arrTbl)
+
+       df := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, filePath, nil, 1, 
mustFileSize(t.T(), filePath))
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddDataFiles(t.ctx, []iceberg.DataFile{df}, nil))
+
+       staged, err := tx.StagedTable()
+       t.Require().NoError(err)
+       t.Equal(table.OpAppend, staged.CurrentSnapshot().Summary.Operation)
+       t.Equal("1", 
staged.CurrentSnapshot().Summary.Properties["added-data-files"])
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesWithDataFiles() {
+       ident := table.Identifier{"default", 
"replace_data_files_with_datafiles_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       files := make([]string, 0, 3)
+       for i := range 3 {
+               filePath := 
fmt.Sprintf("%s/replace_data_files_with_datafiles_v%d/data-%d.parquet", 
t.location, t.formatVersion, i)
+               t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), 
filePath, t.arrTbl)
+               files = append(files, filePath)
+       }
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, files, nil, false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       replacementPath := 
fmt.Sprintf("%s/replace_data_files_with_datafiles_v%d/replacement.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), replacementPath, 
t.arrTbl)
+
+       deleteFiles := []iceberg.DataFile{
+               mustDataFile(t.T(), *iceberg.UnpartitionedSpec, files[0], nil, 
1, mustFileSize(t.T(), files[0])),
+               mustDataFile(t.T(), *iceberg.UnpartitionedSpec, files[1], nil, 
1, mustFileSize(t.T(), files[1])),
+       }
+       addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, 
replacementPath, nil, 1, mustFileSize(t.T(), replacementPath))
+
+       tx = tbl.NewTransaction()
+       t.Require().NoError(tx.ReplaceDataFilesWithDataFiles(t.ctx, 
deleteFiles, []iceberg.DataFile{addFile}, nil))
+
+       staged, err := tx.StagedTable()
+       t.Require().NoError(err)
+       t.Equal(table.OpOverwrite, staged.CurrentSnapshot().Summary.Operation)
+       t.Equal("1", 
staged.CurrentSnapshot().Summary.Properties["added-data-files"])
+       t.Equal("2", 
staged.CurrentSnapshot().Summary.Properties["deleted-data-files"])
+}
+
+func (t *TableWritingTestSuite) 
TestReplaceDataFilesWithDataFilesValidatesPartitionSpecID() {
+       ident := table.Identifier{"default", 
"replace_data_files_with_datafiles_spec_validation_v" + 
strconv.Itoa(t.formatVersion)}
+       spec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform: 
iceberg.IdentityTransform{}, Name: "baz"},
+       )
+       tbl := t.createTable(ident, t.formatVersion, spec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_with_datafiles_spec_validation_v%d/existing.parquet",
 t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       invalidSpec := iceberg.NewPartitionSpecID(999,
+               iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform: 
iceberg.IdentityTransform{}, Name: "baz"},
+       )
+
+       deleteFile := mustDataFile(t.T(), spec, existingPath, map[int]any{1000: 
int32(123)}, 1, mustFileSize(t.T(), existingPath))
+       addFile := mustDataFile(t.T(), invalidSpec,
+               
fmt.Sprintf("%s/replace_data_files_with_datafiles_spec_validation_v%d/replacement.parquet",
 t.location, t.formatVersion),
+               map[int]any{1000: int32(123)}, 1, 1)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "invalid partition spec id")
+}
+
+func (t *TableWritingTestSuite) 
TestReplaceDataFilesWithDataFilesValidatesPartitionData() {
+       ident := table.Identifier{"default", 
"replace_data_files_with_datafiles_partition_validation_v" + 
strconv.Itoa(t.formatVersion)}
+       spec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform: 
iceberg.IdentityTransform{}, Name: "baz"},
+       )
+       tbl := t.createTable(ident, t.formatVersion, spec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_with_datafiles_partition_validation_v%d/existing.parquet",
 t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       deleteFile := mustDataFile(t.T(), spec, existingPath, map[int]any{1000: 
int32(123)}, 1, mustFileSize(t.T(), existingPath))
+       addFile := mustDataFile(t.T(), spec,
+               
fmt.Sprintf("%s/replace_data_files_with_datafiles_partition_validation_v%d/replacement.parquet",
 t.location, t.formatVersion),
+               map[int]any{}, 1, 1)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "missing partition value")
+}
+
+// ============================================================================
+// AddDataFiles Error Path Tests
+// ============================================================================
+
+func (t *TableWritingTestSuite) TestAddDataFilesDuplicateFilePaths() {
+       ident := table.Identifier{"default", "add_data_files_duplicate_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       filePath := fmt.Sprintf("%s/add_data_files_duplicate_v%d/test.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), filePath, 
t.arrTbl)
+
+       df := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, filePath, nil, 1, 
mustFileSize(t.T(), filePath))
+
+       tx := tbl.NewTransaction()
+       err := tx.AddDataFiles(t.ctx, []iceberg.DataFile{df, df}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "add data file paths must be unique for 
AddDataFiles")
+}
+
+func (t *TableWritingTestSuite) TestAddDataFilesAlreadyReferencedByTable() {
+       ident := table.Identifier{"default", 
"add_data_files_already_referenced_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       filePath := 
fmt.Sprintf("%s/add_data_files_already_referenced_v%d/test.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), filePath, 
t.arrTbl)
+
+       df := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, filePath, nil, 1, 
mustFileSize(t.T(), filePath))
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddDataFiles(t.ctx, []iceberg.DataFile{df}, nil))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       tx = tbl.NewTransaction()
+       err = tx.AddDataFiles(t.ctx, []iceberg.DataFile{df}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "cannot add files that are already referenced by 
table")
+}
+
+func (t *TableWritingTestSuite) TestAddDataFilesNilDataFile() {
+       ident := table.Identifier{"default", "add_data_files_nil_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       tx := tbl.NewTransaction()
+       err := tx.AddDataFiles(t.ctx, []iceberg.DataFile{nil}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "nil data file at index 0")
+}
+
+func (t *TableWritingTestSuite) TestAddDataFilesInvalidContentType() {
+       ident := table.Identifier{"default", "add_data_files_invalid_content_v" 
+ strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       filePath := 
fmt.Sprintf("%s/add_data_files_invalid_content_v%d/test.parquet", t.location, 
t.formatVersion)
+
+       builder, err := iceberg.NewDataFileBuilder(*iceberg.UnpartitionedSpec, 
iceberg.EntryContentPosDeletes, filePath, iceberg.ParquetFile, nil, nil, nil, 
1, 100)
+       t.Require().NoError(err)
+       df := builder.Build()
+
+       tx := tbl.NewTransaction()
+       err = tx.AddDataFiles(t.ctx, []iceberg.DataFile{df}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "adding files other than data files is not yet 
implemented")
+}
+
+func (t *TableWritingTestSuite) TestAddDataFilesEmptySlice() {
+       ident := table.Identifier{"default", "add_data_files_empty_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       tx := tbl.NewTransaction()
+       err := tx.AddDataFiles(t.ctx, []iceberg.DataFile{}, nil)
+       t.NoError(err)
+
+       staged, err := tx.StagedTable()
+       t.Require().NoError(err)
+       t.Nil(staged.CurrentSnapshot())
+}
+
+// ============================================================================
+// ReplaceDataFilesWithDataFiles Error Path Tests
+// ============================================================================
+
+func (t *TableWritingTestSuite) 
TestReplaceDataFilesWithDataFilesDuplicateAddPaths() {
+       ident := table.Identifier{"default", "replace_data_files_dup_add_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_dup_add_v%d/existing.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, 
existingPath, nil, 1, mustFileSize(t.T(), existingPath))
+       addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+               fmt.Sprintf("%s/replace_data_files_dup_add_v%d/new.parquet", 
t.location, t.formatVersion), nil, 1, 100)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile, addFile}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "add data file paths must be unique for 
ReplaceDataFilesWithDataFiles")
+}
+
+func (t *TableWritingTestSuite) 
TestReplaceDataFilesWithDataFilesDuplicateDeletePaths() {
+       ident := table.Identifier{"default", "replace_data_files_dup_del_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_dup_del_v%d/existing.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, 
existingPath, nil, 1, mustFileSize(t.T(), existingPath))
+       addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+               fmt.Sprintf("%s/replace_data_files_dup_del_v%d/new.parquet", 
t.location, t.formatVersion), nil, 1, 100)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{deleteFile, deleteFile}, []iceberg.DataFile{addFile}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "delete data file paths must be unique for 
ReplaceDataFilesWithDataFiles")
+}
+
+func (t *TableWritingTestSuite) 
TestReplaceDataFilesWithDataFilesAddAlreadyReferenced() {
+       ident := table.Identifier{"default", "replace_data_files_add_ref_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath1 := 
fmt.Sprintf("%s/replace_data_files_add_ref_v%d/existing1.parquet", t.location, 
t.formatVersion)
+       existingPath2 := 
fmt.Sprintf("%s/replace_data_files_add_ref_v%d/existing2.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath1, 
t.arrTbl)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath2, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath1, 
existingPath2}, nil, false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, 
existingPath1, nil, 1, mustFileSize(t.T(), existingPath1))
+       addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, 
existingPath2, nil, 1, mustFileSize(t.T(), existingPath2))
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "cannot add files that are already referenced by 
table")
+}
+
+func (t *TableWritingTestSuite) 
TestReplaceDataFilesWithDataFilesDeleteNotInTable() {
+       ident := table.Identifier{"default", 
"replace_data_files_del_not_found_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_del_not_found_v%d/existing.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       nonExistentFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+               
fmt.Sprintf("%s/replace_data_files_del_not_found_v%d/nonexistent.parquet", 
t.location, t.formatVersion), nil, 1, 100)
+       addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+               
fmt.Sprintf("%s/replace_data_files_del_not_found_v%d/new.parquet", t.location, 
t.formatVersion), nil, 1, 100)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{nonExistentFile}, []iceberg.DataFile{addFile}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "cannot delete files that do not belong to the 
table")
+}
+
+func (t *TableWritingTestSuite) 
TestReplaceDataFilesWithDataFilesNilDeleteFile() {
+       ident := table.Identifier{"default", "replace_data_files_nil_del_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_nil_del_v%d/existing.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+               fmt.Sprintf("%s/replace_data_files_nil_del_v%d/new.parquet", 
t.location, t.formatVersion), nil, 1, 100)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFilesWithDataFiles(t.ctx, []iceberg.DataFile{nil}, 
[]iceberg.DataFile{addFile}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "nil data file at index 0")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesWithDataFilesNilAddFile() {
+       ident := table.Identifier{"default", "replace_data_files_nil_add_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_nil_add_v%d/existing.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, 
existingPath, nil, 1, mustFileSize(t.T(), existingPath))
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{nil}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "nil data file at index 0")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesWithDataFilesNoSnapshot() {
+       ident := table.Identifier{"default", "replace_data_files_no_snapshot_v" 
+ strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+               
fmt.Sprintf("%s/replace_data_files_no_snapshot_v%d/delete.parquet", t.location, 
t.formatVersion), nil, 1, 100)
+       addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+               
fmt.Sprintf("%s/replace_data_files_no_snapshot_v%d/add.parquet", t.location, 
t.formatVersion), nil, 1, 100)
+
+       tx := tbl.NewTransaction()
+       err := tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "cannot replace files in a table without an 
existing snapshot")
+}
+
+func (t *TableWritingTestSuite) 
TestReplaceDataFilesWithDataFilesInvalidAddContentType() {
+       ident := table.Identifier{"default", 
"replace_data_files_invalid_add_content_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_invalid_add_content_v%d/existing.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, 
existingPath, nil, 1, mustFileSize(t.T(), existingPath))
+
+       builder, err := iceberg.NewDataFileBuilder(*iceberg.UnpartitionedSpec, 
iceberg.EntryContentPosDeletes,
+               
fmt.Sprintf("%s/replace_data_files_invalid_add_content_v%d/new.parquet", 
t.location, t.formatVersion),
+               iceberg.ParquetFile, nil, nil, nil, 1, 100)
+       t.Require().NoError(err)
+       invalidAddFile := builder.Build()
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{invalidAddFile}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "adding files other than data files is not yet 
implemented")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesWithDataFilesEmptyBoth() {
+       ident := table.Identifier{"default", "replace_data_files_empty_both_v" 
+ strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       tx := tbl.NewTransaction()
+       err := tx.ReplaceDataFilesWithDataFiles(t.ctx, []iceberg.DataFile{}, 
[]iceberg.DataFile{}, nil)
+       t.NoError(err)
+
+       staged, err := tx.StagedTable()
+       t.Require().NoError(err)
+       t.Nil(staged.CurrentSnapshot())
+}
+
+func (t *TableWritingTestSuite) 
TestReplaceDataFilesWithDataFilesDelegatesToAddDataFiles() {
+       ident := table.Identifier{"default", "replace_data_files_delegate_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       filePath := 
fmt.Sprintf("%s/replace_data_files_delegate_v%d/test.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), filePath, 
t.arrTbl)
+
+       addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, filePath, 
nil, 1, mustFileSize(t.T(), filePath))
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.ReplaceDataFilesWithDataFiles(t.ctx, 
[]iceberg.DataFile{}, []iceberg.DataFile{addFile}, nil))
+
+       staged, err := tx.StagedTable()
+       t.Require().NoError(err)
+       t.Equal(table.OpAppend, staged.CurrentSnapshot().Summary.Operation)
+       t.Equal("1", 
staged.CurrentSnapshot().Summary.Properties["added-data-files"])
+}
+
+// ============================================================================
+// ReplaceDataFiles (string paths) Error Path Tests
+// ============================================================================
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesDuplicateDeletePaths() {
+       ident := table.Identifier{"default", 
"replace_data_files_dup_del_paths_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_dup_del_paths_v%d/existing.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       newPath := 
fmt.Sprintf("%s/replace_data_files_dup_del_paths_v%d/new.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), newPath, 
t.arrTbl)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFiles(t.ctx, []string{existingPath, existingPath}, 
[]string{newPath}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "delete file paths must be unique for 
ReplaceDataFiles")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesDuplicateAddPaths() {
+       ident := table.Identifier{"default", 
"replace_data_files_dup_add_paths_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_dup_add_paths_v%d/existing.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       newPath := 
fmt.Sprintf("%s/replace_data_files_dup_add_paths_v%d/new.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), newPath, 
t.arrTbl)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFiles(t.ctx, []string{existingPath}, 
[]string{newPath, newPath}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "add file paths must be unique for 
ReplaceDataFiles")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesAddAlreadyReferenced() {
+       ident := table.Identifier{"default", 
"replace_data_files_add_already_ref_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath1 := 
fmt.Sprintf("%s/replace_data_files_add_already_ref_v%d/existing1.parquet", 
t.location, t.formatVersion)
+       existingPath2 := 
fmt.Sprintf("%s/replace_data_files_add_already_ref_v%d/existing2.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath1, 
t.arrTbl)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath2, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath1, 
existingPath2}, nil, false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFiles(t.ctx, []string{existingPath1}, 
[]string{existingPath2}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "cannot add files that are already referenced by 
table")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesDeleteNotInTable() {
+       ident := table.Identifier{"default", 
"replace_data_files_del_not_in_table_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       existingPath := 
fmt.Sprintf("%s/replace_data_files_del_not_in_table_v%d/existing.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath, 
t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil, 
false))
+       tbl, err := tx.Commit(t.ctx)
+       t.Require().NoError(err)
+
+       newPath := 
fmt.Sprintf("%s/replace_data_files_del_not_in_table_v%d/new.parquet", 
t.location, t.formatVersion)
+       t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), newPath, 
t.arrTbl)
+
+       nonExistentPath := 
fmt.Sprintf("%s/replace_data_files_del_not_in_table_v%d/nonexistent.parquet", 
t.location, t.formatVersion)
+
+       tx = tbl.NewTransaction()
+       err = tx.ReplaceDataFiles(t.ctx, []string{nonExistentPath}, 
[]string{newPath}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "cannot delete files that do not belong to the 
table")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesNoSnapshot() {
+       ident := table.Identifier{"default", 
"replace_data_files_no_snapshot_paths_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       tx := tbl.NewTransaction()
+       err := tx.ReplaceDataFiles(t.ctx, []string{"delete.parquet"}, 
[]string{"add.parquet"}, nil)
+       t.Error(err)
+       t.ErrorContains(err, "cannot replace files in a table without an 
existing snapshot")
+}
+
 func (t *TableWritingTestSuite) TestExpireSnapshots() {
        fs := iceio.LocalFS{}
 
diff --git a/table/transaction.go b/table/transaction.go
index 06b01631..00fe7354 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -435,6 +435,254 @@ func (t *Transaction) ReplaceDataFiles(ctx 
context.Context, filesToDelete, files
        return t.apply(updates, reqs)
 }
 
+// validateDataFilePartitionData verifies that DataFile partition values match
+// the current partition spec fields by ID without reading file contents.
+func validateDataFilePartitionData(df iceberg.DataFile, spec 
*iceberg.PartitionSpec) error {
+       partitionData := df.Partition()
+       if partitionData == nil {
+               partitionData = map[int]any{}
+       }
+
+       expectedFieldIDs := make(map[int]string)
+       for field := range spec.Fields() {
+               expectedFieldIDs[field.FieldID] = field.Name
+               if _, ok := partitionData[field.FieldID]; !ok {
+                       return fmt.Errorf("missing partition value for field id 
%d (%s)", field.FieldID, field.Name)
+               }
+       }
+
+       for fieldID := range partitionData {
+               if _, ok := expectedFieldIDs[fieldID]; !ok {
+                       return fmt.Errorf("unknown partition field id %d for 
spec id %d", fieldID, spec.ID())
+               }
+       }
+
+       return nil
+}
+
+// validateDataFilesToAdd performs metadata-only validation for caller-provided
+// DataFiles and returns a set of paths that passed validation.
+func (t *Transaction) validateDataFilesToAdd(dataFiles []iceberg.DataFile, 
operation string) (map[string]struct{}, error) {
+       currentSpec, err := t.meta.CurrentSpec()
+       if err != nil || currentSpec == nil {
+               return nil, fmt.Errorf("could not get current partition spec: 
%w", err)
+       }
+
+       expectedSpecID := int32(currentSpec.ID())
+       setToAdd := make(map[string]struct{}, len(dataFiles))
+
+       for i, df := range dataFiles {
+               if df == nil {
+                       return nil, fmt.Errorf("nil data file at index %d for 
%s", i, operation)
+               }
+
+               path := df.FilePath()
+               if path == "" {
+                       return nil, fmt.Errorf("data file path cannot be empty 
for %s", operation)
+               }
+
+               if _, ok := setToAdd[path]; ok {
+                       return nil, fmt.Errorf("add data file paths must be 
unique for %s", operation)
+               }
+               setToAdd[path] = struct{}{}
+
+               if df.ContentType() != iceberg.EntryContentData {
+                       return nil, fmt.Errorf("adding files other than data 
files is not yet implemented: file %s has content type %s for %s", path, 
df.ContentType(), operation)
+               }
+
+               switch df.FileFormat() {
+               case iceberg.ParquetFile, iceberg.OrcFile, iceberg.AvroFile:
+               default:
+                       return nil, fmt.Errorf("data file %s has invalid file 
format %s for %s", path, df.FileFormat(), operation)
+               }
+
+               if df.SpecID() != expectedSpecID {
+                       return nil, fmt.Errorf("data file %s has invalid 
partition spec id %d for %s: expected %d",
+                               path, df.SpecID(), operation, expectedSpecID)
+               }
+
+               if err := validateDataFilePartitionData(df, currentSpec); err 
!= nil {
+                       return nil, fmt.Errorf("data file %s has invalid 
partition data for %s: %w", path, operation, err)
+               }
+       }
+
+       return setToAdd, nil
+}
+
+// AddDataFiles adds pre-built DataFiles to the table without scanning them 
from storage.
+// This is useful for clients who have already constructed DataFile objects 
with metadata,
+// avoiding the need to read files to extract schema and statistics.
+//
+// Unlike AddFiles, this method does not read files from storage. It validates 
only metadata
+// that can be checked without opening files (for example spec-id and 
partition field IDs).
+//
+// Callers are responsible for ensuring each DataFile is valid and consistent 
with the table.
+// Supplying incorrect DataFile metadata can produce an invalid snapshot and 
break reads.
+func (t *Transaction) AddDataFiles(ctx context.Context, dataFiles 
[]iceberg.DataFile, snapshotProps iceberg.Properties) error {
+       if len(dataFiles) == 0 {
+               return nil
+       }
+
+       setToAdd, err := t.validateDataFilesToAdd(dataFiles, "AddDataFiles")
+       if err != nil {
+               return err
+       }
+
+       fs, err := t.tbl.fsF(ctx)
+       if err != nil {
+               return err
+       }
+
+       if s := t.meta.currentSnapshot(); s != nil {
+               referenced := make([]string, 0)
+               for df, err := range s.dataFiles(fs, nil) {
+                       if err != nil {
+                               return err
+                       }
+
+                       if _, ok := setToAdd[df.FilePath()]; ok {
+                               referenced = append(referenced, df.FilePath())
+                       }
+               }
+
+               if len(referenced) > 0 {
+                       return fmt.Errorf("cannot add files that are already 
referenced by table, files: %s", referenced)
+               }
+       }
+
+       if t.meta.NameMapping() == nil {
+               nameMapping := t.meta.CurrentSchema().NameMapping()
+               mappingJson, err := json.Marshal(nameMapping)
+               if err != nil {
+                       return err
+               }
+               err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: 
string(mappingJson)})
+               if err != nil {
+                       return err
+               }
+       }
+
+       appendFiles := t.appendSnapshotProducer(fs, snapshotProps)
+       for _, df := range dataFiles {
+               appendFiles.appendDataFile(df)
+       }
+
+       updates, reqs, err := appendFiles.commit()
+       if err != nil {
+               return err
+       }
+
+       return t.apply(updates, reqs)
+}
+
+// ReplaceDataFilesWithDataFiles replaces files using pre-built DataFile 
objects.
+// This avoids scanning files to extract schema and statistics - the caller 
provides
+// DataFile objects directly with all required metadata.
+//
+// For the files to add, use iceberg.NewDataFileBuilder to construct DataFile 
objects
+// with the appropriate metadata (path, record count, file size, partition 
values).
+//
+// This method does not open files. It validates only metadata that can be 
checked
+// without reading file contents.
+//
+// Callers are responsible for ensuring each DataFile is valid and consistent 
with the table.
+// Supplying incorrect DataFile metadata can produce an invalid snapshot and 
break reads.
+//
+// This is useful when:
+//   - Files are written via a separate I/O path and metadata is already known
+//   - Avoiding file scanning improves performance or reliability
+//   - Working with storage systems where immediate file reads may be 
unreliable
+func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx context.Context, 
filesToDelete, filesToAdd []iceberg.DataFile, snapshotProps iceberg.Properties) 
error {
+       if len(filesToDelete) == 0 {
+               if len(filesToAdd) > 0 {
+                       return t.AddDataFiles(ctx, filesToAdd, snapshotProps)
+               }
+
+               return nil
+       }
+
+       setToAdd, err := t.validateDataFilesToAdd(filesToAdd, 
"ReplaceDataFilesWithDataFiles")
+       if err != nil {
+               return err
+       }
+
+       setToDelete := make(map[string]struct{}, len(filesToDelete))
+       for i, df := range filesToDelete {
+               if df == nil {
+                       return fmt.Errorf("nil data file at index %d for 
ReplaceDataFilesWithDataFiles", i)
+               }
+
+               path := df.FilePath()
+               if path == "" {
+                       return errors.New("delete data file paths must be 
non-empty for ReplaceDataFilesWithDataFiles")
+               }
+
+               if _, ok := setToDelete[path]; ok {
+                       return errors.New("delete data file paths must be 
unique for ReplaceDataFilesWithDataFiles")
+               }
+               setToDelete[path] = struct{}{}
+       }
+
+       s := t.meta.currentSnapshot()
+       if s == nil {
+               return fmt.Errorf("%w: cannot replace files in a table without 
an existing snapshot", ErrInvalidOperation)
+       }
+
+       fs, err := t.tbl.fsF(ctx)
+       if err != nil {
+               return err
+       }
+
+       markedForDeletion := make([]iceberg.DataFile, 0, len(setToDelete))
+       for df, err := range s.dataFiles(fs, nil) {
+               if err != nil {
+                       return err
+               }
+
+               if _, ok := setToDelete[df.FilePath()]; ok {
+                       markedForDeletion = append(markedForDeletion, df)
+               }
+
+               if _, ok := setToAdd[df.FilePath()]; ok {
+                       return fmt.Errorf("cannot add files that are already 
referenced by table, files: %s", df.FilePath())
+               }
+       }
+
+       if len(markedForDeletion) != len(setToDelete) {
+               return errors.New("cannot delete files that do not belong to 
the table")
+       }
+
+       if t.meta.NameMapping() == nil {
+               nameMapping := t.meta.CurrentSchema().NameMapping()
+               mappingJson, err := json.Marshal(nameMapping)
+               if err != nil {
+                       return err
+               }
+               err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: 
string(mappingJson)})
+               if err != nil {
+                       return err
+               }
+       }
+
+       commitUUID := uuid.New()
+       updater := t.updateSnapshot(fs, snapshotProps, 
OpOverwrite).mergeOverwrite(&commitUUID)
+
+       for _, df := range markedForDeletion {
+               updater.deleteDataFile(df)
+       }
+
+       for _, df := range filesToAdd {
+               updater.appendDataFile(df)
+       }
+
+       updates, reqs, err := updater.commit()
+       if err != nil {
+               return err
+       }
+
+       return t.apply(updates, reqs)
+}
+
 func (t *Transaction) AddFiles(ctx context.Context, files []string, 
snapshotProps iceberg.Properties, ignoreDuplicates bool) error {
        set := make(map[string]string)
        for _, f := range files {


Reply via email to