This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new a5765060 feat: add functions for add and replacing data directly with
datafiles (#723)
a5765060 is described below
commit a57650605af2d2aaf50a3a93bb2647794769c998
Author: Adam Gaddis <[email protected]>
AuthorDate: Thu Feb 19 13:43:03 2026 -0600
feat: add functions for add and replacing data directly with datafiles
(#723)
# Context
If you want to write your own parquet files and only use iceberg to
handle the metadata, you are only left with the option (for the most
part) of leveraging the `ReplaceDataFiles` function.
This function takes in a list of existing files and a list of new file
paths to override that previous data with.
This function works fine for the most part, but the function includes a
scan in it which means it's not actually taking your word that your new
parquet files match the table schema.
This scan proves to be problematic in some cases when you are writing
files very fast and leveraging multipart uploads. You know the location
of all files, know they are valid parquet files, but the commit has the
possibility to return an error because at the time of commit the file
might not be fully available.
the error looks something like this at commit time: `failed to replace
data files: error encountered during file conversion: parquet: could not
read 8 bytes from end of file`.
# Solution
We have tested this out in vendor code and opened a fork that adds a new
function.
`ReplaceDataFiles` is scanning your file paths to try and ensure the
schema of said files match the schema of the table you are inputting
them into.
We, and I would assume a lot of people writing their own parquet files,
don't need this. Our ingestion framework guarantees we will never get a
incorrect parquet file, and we also have access to our Parquet Schema
and Arrow Schema for the entirety of the ingestion.
So I can build data files directly and would much rather just pass my
own datafiles to this function, as I know the files will eventually be
available and they will be correct. all this is doing is telling the
metadata where to look at said file, there is no real harm in committing
before that file is actually available unless you are querying it right
away and it happens to not be available.
This also speeds up the commit time tremendously as this library doesn't
need to go through scan all of the files for every single commit.
Co-authored-by: Adam Gaddis <[email protected]>
---
table/table_test.go | 509 +++++++++++++++++++++++++++++++++++++++++++++++++++
table/transaction.go | 248 +++++++++++++++++++++++++
2 files changed, 757 insertions(+)
diff --git a/table/table_test.go b/table/table_test.go
index ae1bbdde..7f7e0e57 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -71,6 +71,22 @@ func mustFS(t *testing.T, tbl *table.Table) iceio.IO {
return r
}
+// mustFileSize returns the file size for a test path and fails the test on
error.
+func mustFileSize(t *testing.T, path string) int64 {
+ info, err := os.Stat(path)
+ require.NoError(t, err)
+
+ return info.Size()
+}
+
+// mustDataFile builds a test DataFile and fails the test if construction
fails.
+func mustDataFile(t *testing.T, spec iceberg.PartitionSpec, path string,
partition map[int]any, count, size int64) iceberg.DataFile {
+ builder, err := iceberg.NewDataFileBuilder(spec,
iceberg.EntryContentData, path, iceberg.ParquetFile, partition, nil, nil,
count, size)
+ require.NoError(t, err)
+
+ return builder.Build()
+}
+
func (t *TableTestSuite) SetupSuite() {
var mockfs internal.MockFS
mockfs.Test(t.T())
@@ -922,6 +938,499 @@ func (t *TableWritingTestSuite) TestReplaceDataFiles() {
}, staged.CurrentSnapshot().Summary)
}
+func (t *TableWritingTestSuite) TestAddDataFiles() {
+ ident := table.Identifier{"default", "add_data_files_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ filePath := fmt.Sprintf("%s/add_data_files_v%d/test.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), filePath,
t.arrTbl)
+
+ df := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, filePath, nil, 1,
mustFileSize(t.T(), filePath))
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddDataFiles(t.ctx, []iceberg.DataFile{df}, nil))
+
+ staged, err := tx.StagedTable()
+ t.Require().NoError(err)
+ t.Equal(table.OpAppend, staged.CurrentSnapshot().Summary.Operation)
+ t.Equal("1",
staged.CurrentSnapshot().Summary.Properties["added-data-files"])
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesWithDataFiles() {
+ ident := table.Identifier{"default",
"replace_data_files_with_datafiles_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ files := make([]string, 0, 3)
+ for i := range 3 {
+ filePath :=
fmt.Sprintf("%s/replace_data_files_with_datafiles_v%d/data-%d.parquet",
t.location, t.formatVersion, i)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO),
filePath, t.arrTbl)
+ files = append(files, filePath)
+ }
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, files, nil, false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ replacementPath :=
fmt.Sprintf("%s/replace_data_files_with_datafiles_v%d/replacement.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), replacementPath,
t.arrTbl)
+
+ deleteFiles := []iceberg.DataFile{
+ mustDataFile(t.T(), *iceberg.UnpartitionedSpec, files[0], nil,
1, mustFileSize(t.T(), files[0])),
+ mustDataFile(t.T(), *iceberg.UnpartitionedSpec, files[1], nil,
1, mustFileSize(t.T(), files[1])),
+ }
+ addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
replacementPath, nil, 1, mustFileSize(t.T(), replacementPath))
+
+ tx = tbl.NewTransaction()
+ t.Require().NoError(tx.ReplaceDataFilesWithDataFiles(t.ctx,
deleteFiles, []iceberg.DataFile{addFile}, nil))
+
+ staged, err := tx.StagedTable()
+ t.Require().NoError(err)
+ t.Equal(table.OpOverwrite, staged.CurrentSnapshot().Summary.Operation)
+ t.Equal("1",
staged.CurrentSnapshot().Summary.Properties["added-data-files"])
+ t.Equal("2",
staged.CurrentSnapshot().Summary.Properties["deleted-data-files"])
+}
+
+func (t *TableWritingTestSuite)
TestReplaceDataFilesWithDataFilesValidatesPartitionSpecID() {
+ ident := table.Identifier{"default",
"replace_data_files_with_datafiles_spec_validation_v" +
strconv.Itoa(t.formatVersion)}
+ spec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform:
iceberg.IdentityTransform{}, Name: "baz"},
+ )
+ tbl := t.createTable(ident, t.formatVersion, spec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_with_datafiles_spec_validation_v%d/existing.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ invalidSpec := iceberg.NewPartitionSpecID(999,
+ iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform:
iceberg.IdentityTransform{}, Name: "baz"},
+ )
+
+ deleteFile := mustDataFile(t.T(), spec, existingPath, map[int]any{1000:
int32(123)}, 1, mustFileSize(t.T(), existingPath))
+ addFile := mustDataFile(t.T(), invalidSpec,
+
fmt.Sprintf("%s/replace_data_files_with_datafiles_spec_validation_v%d/replacement.parquet",
t.location, t.formatVersion),
+ map[int]any{1000: int32(123)}, 1, 1)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "invalid partition spec id")
+}
+
+func (t *TableWritingTestSuite)
TestReplaceDataFilesWithDataFilesValidatesPartitionData() {
+ ident := table.Identifier{"default",
"replace_data_files_with_datafiles_partition_validation_v" +
strconv.Itoa(t.formatVersion)}
+ spec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform:
iceberg.IdentityTransform{}, Name: "baz"},
+ )
+ tbl := t.createTable(ident, t.formatVersion, spec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_with_datafiles_partition_validation_v%d/existing.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ deleteFile := mustDataFile(t.T(), spec, existingPath, map[int]any{1000:
int32(123)}, 1, mustFileSize(t.T(), existingPath))
+ addFile := mustDataFile(t.T(), spec,
+
fmt.Sprintf("%s/replace_data_files_with_datafiles_partition_validation_v%d/replacement.parquet",
t.location, t.formatVersion),
+ map[int]any{}, 1, 1)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "missing partition value")
+}
+
+// ============================================================================
+// AddDataFiles Error Path Tests
+// ============================================================================
+
+func (t *TableWritingTestSuite) TestAddDataFilesDuplicateFilePaths() {
+ ident := table.Identifier{"default", "add_data_files_duplicate_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ filePath := fmt.Sprintf("%s/add_data_files_duplicate_v%d/test.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), filePath,
t.arrTbl)
+
+ df := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, filePath, nil, 1,
mustFileSize(t.T(), filePath))
+
+ tx := tbl.NewTransaction()
+ err := tx.AddDataFiles(t.ctx, []iceberg.DataFile{df, df}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "add data file paths must be unique for
AddDataFiles")
+}
+
+func (t *TableWritingTestSuite) TestAddDataFilesAlreadyReferencedByTable() {
+ ident := table.Identifier{"default",
"add_data_files_already_referenced_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ filePath :=
fmt.Sprintf("%s/add_data_files_already_referenced_v%d/test.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), filePath,
t.arrTbl)
+
+ df := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, filePath, nil, 1,
mustFileSize(t.T(), filePath))
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddDataFiles(t.ctx, []iceberg.DataFile{df}, nil))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ tx = tbl.NewTransaction()
+ err = tx.AddDataFiles(t.ctx, []iceberg.DataFile{df}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "cannot add files that are already referenced by
table")
+}
+
+func (t *TableWritingTestSuite) TestAddDataFilesNilDataFile() {
+ ident := table.Identifier{"default", "add_data_files_nil_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ tx := tbl.NewTransaction()
+ err := tx.AddDataFiles(t.ctx, []iceberg.DataFile{nil}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "nil data file at index 0")
+}
+
+func (t *TableWritingTestSuite) TestAddDataFilesInvalidContentType() {
+ ident := table.Identifier{"default", "add_data_files_invalid_content_v"
+ strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ filePath :=
fmt.Sprintf("%s/add_data_files_invalid_content_v%d/test.parquet", t.location,
t.formatVersion)
+
+ builder, err := iceberg.NewDataFileBuilder(*iceberg.UnpartitionedSpec,
iceberg.EntryContentPosDeletes, filePath, iceberg.ParquetFile, nil, nil, nil,
1, 100)
+ t.Require().NoError(err)
+ df := builder.Build()
+
+ tx := tbl.NewTransaction()
+ err = tx.AddDataFiles(t.ctx, []iceberg.DataFile{df}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "adding files other than data files is not yet
implemented")
+}
+
+func (t *TableWritingTestSuite) TestAddDataFilesEmptySlice() {
+ ident := table.Identifier{"default", "add_data_files_empty_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ tx := tbl.NewTransaction()
+ err := tx.AddDataFiles(t.ctx, []iceberg.DataFile{}, nil)
+ t.NoError(err)
+
+ staged, err := tx.StagedTable()
+ t.Require().NoError(err)
+ t.Nil(staged.CurrentSnapshot())
+}
+
+// ============================================================================
+// ReplaceDataFilesWithDataFiles Error Path Tests
+// ============================================================================
+
+func (t *TableWritingTestSuite)
TestReplaceDataFilesWithDataFilesDuplicateAddPaths() {
+ ident := table.Identifier{"default", "replace_data_files_dup_add_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_dup_add_v%d/existing.parquet", t.location,
t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
existingPath, nil, 1, mustFileSize(t.T(), existingPath))
+ addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+ fmt.Sprintf("%s/replace_data_files_dup_add_v%d/new.parquet",
t.location, t.formatVersion), nil, 1, 100)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile, addFile}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "add data file paths must be unique for
ReplaceDataFilesWithDataFiles")
+}
+
+func (t *TableWritingTestSuite)
TestReplaceDataFilesWithDataFilesDuplicateDeletePaths() {
+ ident := table.Identifier{"default", "replace_data_files_dup_del_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_dup_del_v%d/existing.parquet", t.location,
t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
existingPath, nil, 1, mustFileSize(t.T(), existingPath))
+ addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+ fmt.Sprintf("%s/replace_data_files_dup_del_v%d/new.parquet",
t.location, t.formatVersion), nil, 1, 100)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{deleteFile, deleteFile}, []iceberg.DataFile{addFile}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "delete data file paths must be unique for
ReplaceDataFilesWithDataFiles")
+}
+
+func (t *TableWritingTestSuite)
TestReplaceDataFilesWithDataFilesAddAlreadyReferenced() {
+ ident := table.Identifier{"default", "replace_data_files_add_ref_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath1 :=
fmt.Sprintf("%s/replace_data_files_add_ref_v%d/existing1.parquet", t.location,
t.formatVersion)
+ existingPath2 :=
fmt.Sprintf("%s/replace_data_files_add_ref_v%d/existing2.parquet", t.location,
t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath1,
t.arrTbl)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath2,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath1,
existingPath2}, nil, false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
existingPath1, nil, 1, mustFileSize(t.T(), existingPath1))
+ addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
existingPath2, nil, 1, mustFileSize(t.T(), existingPath2))
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "cannot add files that are already referenced by
table")
+}
+
+func (t *TableWritingTestSuite)
TestReplaceDataFilesWithDataFilesDeleteNotInTable() {
+ ident := table.Identifier{"default",
"replace_data_files_del_not_found_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_del_not_found_v%d/existing.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ nonExistentFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+
fmt.Sprintf("%s/replace_data_files_del_not_found_v%d/nonexistent.parquet",
t.location, t.formatVersion), nil, 1, 100)
+ addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+
fmt.Sprintf("%s/replace_data_files_del_not_found_v%d/new.parquet", t.location,
t.formatVersion), nil, 1, 100)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{nonExistentFile}, []iceberg.DataFile{addFile}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "cannot delete files that do not belong to the
table")
+}
+
+func (t *TableWritingTestSuite)
TestReplaceDataFilesWithDataFilesNilDeleteFile() {
+ ident := table.Identifier{"default", "replace_data_files_nil_del_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_nil_del_v%d/existing.parquet", t.location,
t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+ fmt.Sprintf("%s/replace_data_files_nil_del_v%d/new.parquet",
t.location, t.formatVersion), nil, 1, 100)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFilesWithDataFiles(t.ctx, []iceberg.DataFile{nil},
[]iceberg.DataFile{addFile}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "nil data file at index 0")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesWithDataFilesNilAddFile() {
+ ident := table.Identifier{"default", "replace_data_files_nil_add_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_nil_add_v%d/existing.parquet", t.location,
t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
existingPath, nil, 1, mustFileSize(t.T(), existingPath))
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{nil}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "nil data file at index 0")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesWithDataFilesNoSnapshot() {
+ ident := table.Identifier{"default", "replace_data_files_no_snapshot_v"
+ strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+
fmt.Sprintf("%s/replace_data_files_no_snapshot_v%d/delete.parquet", t.location,
t.formatVersion), nil, 1, 100)
+ addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
+
fmt.Sprintf("%s/replace_data_files_no_snapshot_v%d/add.parquet", t.location,
t.formatVersion), nil, 1, 100)
+
+ tx := tbl.NewTransaction()
+ err := tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{addFile}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "cannot replace files in a table without an
existing snapshot")
+}
+
+func (t *TableWritingTestSuite)
TestReplaceDataFilesWithDataFilesInvalidAddContentType() {
+ ident := table.Identifier{"default",
"replace_data_files_invalid_add_content_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_invalid_add_content_v%d/existing.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ deleteFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec,
existingPath, nil, 1, mustFileSize(t.T(), existingPath))
+
+ builder, err := iceberg.NewDataFileBuilder(*iceberg.UnpartitionedSpec,
iceberg.EntryContentPosDeletes,
+
fmt.Sprintf("%s/replace_data_files_invalid_add_content_v%d/new.parquet",
t.location, t.formatVersion),
+ iceberg.ParquetFile, nil, nil, nil, 1, 100)
+ t.Require().NoError(err)
+ invalidAddFile := builder.Build()
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{deleteFile}, []iceberg.DataFile{invalidAddFile}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "adding files other than data files is not yet
implemented")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesWithDataFilesEmptyBoth() {
+ ident := table.Identifier{"default", "replace_data_files_empty_both_v"
+ strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ tx := tbl.NewTransaction()
+ err := tx.ReplaceDataFilesWithDataFiles(t.ctx, []iceberg.DataFile{},
[]iceberg.DataFile{}, nil)
+ t.NoError(err)
+
+ staged, err := tx.StagedTable()
+ t.Require().NoError(err)
+ t.Nil(staged.CurrentSnapshot())
+}
+
+func (t *TableWritingTestSuite)
TestReplaceDataFilesWithDataFilesDelegatesToAddDataFiles() {
+ ident := table.Identifier{"default", "replace_data_files_delegate_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ filePath :=
fmt.Sprintf("%s/replace_data_files_delegate_v%d/test.parquet", t.location,
t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), filePath,
t.arrTbl)
+
+ addFile := mustDataFile(t.T(), *iceberg.UnpartitionedSpec, filePath,
nil, 1, mustFileSize(t.T(), filePath))
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.ReplaceDataFilesWithDataFiles(t.ctx,
[]iceberg.DataFile{}, []iceberg.DataFile{addFile}, nil))
+
+ staged, err := tx.StagedTable()
+ t.Require().NoError(err)
+ t.Equal(table.OpAppend, staged.CurrentSnapshot().Summary.Operation)
+ t.Equal("1",
staged.CurrentSnapshot().Summary.Properties["added-data-files"])
+}
+
+// ============================================================================
+// ReplaceDataFiles (string paths) Error Path Tests
+// ============================================================================
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesDuplicateDeletePaths() {
+ ident := table.Identifier{"default",
"replace_data_files_dup_del_paths_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_dup_del_paths_v%d/existing.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ newPath :=
fmt.Sprintf("%s/replace_data_files_dup_del_paths_v%d/new.parquet", t.location,
t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), newPath,
t.arrTbl)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFiles(t.ctx, []string{existingPath, existingPath},
[]string{newPath}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "delete file paths must be unique for
ReplaceDataFiles")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesDuplicateAddPaths() {
+ ident := table.Identifier{"default",
"replace_data_files_dup_add_paths_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_dup_add_paths_v%d/existing.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ newPath :=
fmt.Sprintf("%s/replace_data_files_dup_add_paths_v%d/new.parquet", t.location,
t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), newPath,
t.arrTbl)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFiles(t.ctx, []string{existingPath},
[]string{newPath, newPath}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "add file paths must be unique for
ReplaceDataFiles")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesAddAlreadyReferenced() {
+ ident := table.Identifier{"default",
"replace_data_files_add_already_ref_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath1 :=
fmt.Sprintf("%s/replace_data_files_add_already_ref_v%d/existing1.parquet",
t.location, t.formatVersion)
+ existingPath2 :=
fmt.Sprintf("%s/replace_data_files_add_already_ref_v%d/existing2.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath1,
t.arrTbl)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath2,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath1,
existingPath2}, nil, false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFiles(t.ctx, []string{existingPath1},
[]string{existingPath2}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "cannot add files that are already referenced by
table")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesDeleteNotInTable() {
+ ident := table.Identifier{"default",
"replace_data_files_del_not_in_table_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ existingPath :=
fmt.Sprintf("%s/replace_data_files_del_not_in_table_v%d/existing.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), existingPath,
t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(t.ctx, []string{existingPath}, nil,
false))
+ tbl, err := tx.Commit(t.ctx)
+ t.Require().NoError(err)
+
+ newPath :=
fmt.Sprintf("%s/replace_data_files_del_not_in_table_v%d/new.parquet",
t.location, t.formatVersion)
+ t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), newPath,
t.arrTbl)
+
+ nonExistentPath :=
fmt.Sprintf("%s/replace_data_files_del_not_in_table_v%d/nonexistent.parquet",
t.location, t.formatVersion)
+
+ tx = tbl.NewTransaction()
+ err = tx.ReplaceDataFiles(t.ctx, []string{nonExistentPath},
[]string{newPath}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "cannot delete files that do not belong to the
table")
+}
+
+func (t *TableWritingTestSuite) TestReplaceDataFilesNoSnapshot() {
+ ident := table.Identifier{"default",
"replace_data_files_no_snapshot_paths_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ tx := tbl.NewTransaction()
+ err := tx.ReplaceDataFiles(t.ctx, []string{"delete.parquet"},
[]string{"add.parquet"}, nil)
+ t.Error(err)
+ t.ErrorContains(err, "cannot replace files in a table without an
existing snapshot")
+}
+
func (t *TableWritingTestSuite) TestExpireSnapshots() {
fs := iceio.LocalFS{}
diff --git a/table/transaction.go b/table/transaction.go
index 06b01631..00fe7354 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -435,6 +435,254 @@ func (t *Transaction) ReplaceDataFiles(ctx
context.Context, filesToDelete, files
return t.apply(updates, reqs)
}
+// validateDataFilePartitionData verifies that DataFile partition values match
+// the current partition spec fields by ID without reading file contents.
+func validateDataFilePartitionData(df iceberg.DataFile, spec
*iceberg.PartitionSpec) error {
+ partitionData := df.Partition()
+ if partitionData == nil {
+ partitionData = map[int]any{}
+ }
+
+ expectedFieldIDs := make(map[int]string)
+ for field := range spec.Fields() {
+ expectedFieldIDs[field.FieldID] = field.Name
+ if _, ok := partitionData[field.FieldID]; !ok {
+ return fmt.Errorf("missing partition value for field id
%d (%s)", field.FieldID, field.Name)
+ }
+ }
+
+ for fieldID := range partitionData {
+ if _, ok := expectedFieldIDs[fieldID]; !ok {
+ return fmt.Errorf("unknown partition field id %d for
spec id %d", fieldID, spec.ID())
+ }
+ }
+
+ return nil
+}
+
+// validateDataFilesToAdd performs metadata-only validation for caller-provided
+// DataFiles and returns a set of paths that passed validation.
+func (t *Transaction) validateDataFilesToAdd(dataFiles []iceberg.DataFile,
operation string) (map[string]struct{}, error) {
+ currentSpec, err := t.meta.CurrentSpec()
+ if err != nil || currentSpec == nil {
+ return nil, fmt.Errorf("could not get current partition spec:
%w", err)
+ }
+
+ expectedSpecID := int32(currentSpec.ID())
+ setToAdd := make(map[string]struct{}, len(dataFiles))
+
+ for i, df := range dataFiles {
+ if df == nil {
+ return nil, fmt.Errorf("nil data file at index %d for
%s", i, operation)
+ }
+
+ path := df.FilePath()
+ if path == "" {
+ return nil, fmt.Errorf("data file path cannot be empty
for %s", operation)
+ }
+
+ if _, ok := setToAdd[path]; ok {
+ return nil, fmt.Errorf("add data file paths must be
unique for %s", operation)
+ }
+ setToAdd[path] = struct{}{}
+
+ if df.ContentType() != iceberg.EntryContentData {
+ return nil, fmt.Errorf("adding files other than data
files is not yet implemented: file %s has content type %s for %s", path,
df.ContentType(), operation)
+ }
+
+ switch df.FileFormat() {
+ case iceberg.ParquetFile, iceberg.OrcFile, iceberg.AvroFile:
+ default:
+ return nil, fmt.Errorf("data file %s has invalid file
format %s for %s", path, df.FileFormat(), operation)
+ }
+
+ if df.SpecID() != expectedSpecID {
+ return nil, fmt.Errorf("data file %s has invalid
partition spec id %d for %s: expected %d",
+ path, df.SpecID(), operation, expectedSpecID)
+ }
+
+ if err := validateDataFilePartitionData(df, currentSpec); err
!= nil {
+ return nil, fmt.Errorf("data file %s has invalid
partition data for %s: %w", path, operation, err)
+ }
+ }
+
+ return setToAdd, nil
+}
+
+// AddDataFiles adds pre-built DataFiles to the table without scanning them
from storage.
+// This is useful for clients who have already constructed DataFile objects
with metadata,
+// avoiding the need to read files to extract schema and statistics.
+//
+// Unlike AddFiles, this method does not read files from storage. It validates
only metadata
+// that can be checked without opening files (for example spec-id and
partition field IDs).
+//
+// Callers are responsible for ensuring each DataFile is valid and consistent
with the table.
+// Supplying incorrect DataFile metadata can produce an invalid snapshot and
break reads.
+func (t *Transaction) AddDataFiles(ctx context.Context, dataFiles
[]iceberg.DataFile, snapshotProps iceberg.Properties) error {
+ if len(dataFiles) == 0 {
+ return nil
+ }
+
+ setToAdd, err := t.validateDataFilesToAdd(dataFiles, "AddDataFiles")
+ if err != nil {
+ return err
+ }
+
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ if s := t.meta.currentSnapshot(); s != nil {
+ referenced := make([]string, 0)
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return err
+ }
+
+ if _, ok := setToAdd[df.FilePath()]; ok {
+ referenced = append(referenced, df.FilePath())
+ }
+ }
+
+ if len(referenced) > 0 {
+ return fmt.Errorf("cannot add files that are already
referenced by table, files: %s", referenced)
+ }
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ appendFiles := t.appendSnapshotProducer(fs, snapshotProps)
+ for _, df := range dataFiles {
+ appendFiles.appendDataFile(df)
+ }
+
+ updates, reqs, err := appendFiles.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// ReplaceDataFilesWithDataFiles replaces files using pre-built DataFile
objects.
+// This avoids scanning files to extract schema and statistics - the caller
provides
+// DataFile objects directly with all required metadata.
+//
+// For the files to add, use iceberg.NewDataFileBuilder to construct DataFile
objects
+// with the appropriate metadata (path, record count, file size, partition
values).
+//
+// This method does not open files. It validates only metadata that can be
checked
+// without reading file contents.
+//
+// Callers are responsible for ensuring each DataFile is valid and consistent
with the table.
+// Supplying incorrect DataFile metadata can produce an invalid snapshot and
break reads.
+//
+// This is useful when:
+// - Files are written via a separate I/O path and metadata is already known
+// - Avoiding file scanning improves performance or reliability
+// - Working with storage systems where immediate file reads may be
unreliable
+func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx context.Context,
filesToDelete, filesToAdd []iceberg.DataFile, snapshotProps iceberg.Properties)
error {
+ if len(filesToDelete) == 0 {
+ if len(filesToAdd) > 0 {
+ return t.AddDataFiles(ctx, filesToAdd, snapshotProps)
+ }
+
+ return nil
+ }
+
+ setToAdd, err := t.validateDataFilesToAdd(filesToAdd,
"ReplaceDataFilesWithDataFiles")
+ if err != nil {
+ return err
+ }
+
+ setToDelete := make(map[string]struct{}, len(filesToDelete))
+ for i, df := range filesToDelete {
+ if df == nil {
+ return fmt.Errorf("nil data file at index %d for
ReplaceDataFilesWithDataFiles", i)
+ }
+
+ path := df.FilePath()
+ if path == "" {
+ return errors.New("delete data file paths must be
non-empty for ReplaceDataFilesWithDataFiles")
+ }
+
+ if _, ok := setToDelete[path]; ok {
+ return errors.New("delete data file paths must be
unique for ReplaceDataFilesWithDataFiles")
+ }
+ setToDelete[path] = struct{}{}
+ }
+
+ s := t.meta.currentSnapshot()
+ if s == nil {
+ return fmt.Errorf("%w: cannot replace files in a table without
an existing snapshot", ErrInvalidOperation)
+ }
+
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ markedForDeletion := make([]iceberg.DataFile, 0, len(setToDelete))
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return err
+ }
+
+ if _, ok := setToDelete[df.FilePath()]; ok {
+ markedForDeletion = append(markedForDeletion, df)
+ }
+
+ if _, ok := setToAdd[df.FilePath()]; ok {
+ return fmt.Errorf("cannot add files that are already
referenced by table, files: %s", df.FilePath())
+ }
+ }
+
+ if len(markedForDeletion) != len(setToDelete) {
+ return errors.New("cannot delete files that do not belong to
the table")
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs, snapshotProps,
OpOverwrite).mergeOverwrite(&commitUUID)
+
+ for _, df := range markedForDeletion {
+ updater.deleteDataFile(df)
+ }
+
+ for _, df := range filesToAdd {
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
func (t *Transaction) AddFiles(ctx context.Context, files []string,
snapshotProps iceberg.Properties, ignoreDuplicates bool) error {
set := make(map[string]string)
for _, f := range files {