ferhatelmas commented on code in PR #723:
URL: https://github.com/apache/iceberg-go/pull/723#discussion_r2830650003
##########
table/transaction.go:
##########
@@ -435,6 +435,254 @@ func (t *Transaction) ReplaceDataFiles(ctx
context.Context, filesToDelete, files
return t.apply(updates, reqs)
}
+// validateDataFilePartitionData verifies that DataFile partition values match
+// the current partition spec fields by ID without reading file contents.
+func validateDataFilePartitionData(df iceberg.DataFile, spec
*iceberg.PartitionSpec) error {
+ partitionData := df.Partition()
+ if partitionData == nil {
+ partitionData = map[int]any{}
+ }
+
+ expectedFieldIDs := make(map[int]string)
+ for field := range spec.Fields() {
+ expectedFieldIDs[field.FieldID] = field.Name
+ if _, ok := partitionData[field.FieldID]; !ok {
+ return fmt.Errorf("missing partition value for field id
%d (%s)", field.FieldID, field.Name)
+ }
+ }
+
+ for fieldID := range partitionData {
+ if _, ok := expectedFieldIDs[fieldID]; !ok {
+ return fmt.Errorf("unknown partition field id %d for
spec id %d", fieldID, spec.ID())
+ }
+ }
+
+ return nil
+}
+
+// validateDataFilesToAdd performs metadata-only validation for caller-provided
+// DataFiles and returns a set of paths that passed validation.
+func (t *Transaction) validateDataFilesToAdd(dataFiles []iceberg.DataFile,
operation string) (map[string]struct{}, error) {
+ currentSpec, err := t.meta.CurrentSpec()
+ if err != nil || currentSpec == nil {
+ return nil, fmt.Errorf("could not get current partition spec:
%w", err)
+ }
+
+ expectedSpecID := int32(currentSpec.ID())
+ setToAdd := make(map[string]struct{}, len(dataFiles))
+
+ for i, df := range dataFiles {
+ if df == nil {
+ return nil, fmt.Errorf("nil data file at index %d for
%s", i, operation)
+ }
+
+ path := df.FilePath()
+ if path == "" {
+ return nil, fmt.Errorf("data file path cannot be empty
for %s", operation)
+ }
+
+ if _, ok := setToAdd[path]; ok {
+ return nil, fmt.Errorf("add data file paths must be
unique for %s", operation)
+ }
+ setToAdd[path] = struct{}{}
+
+ if df.ContentType() != iceberg.EntryContentData {
+ return nil, fmt.Errorf("adding files other than data
files is not yet implemented: file %s has content type %s for %s", path,
df.ContentType(), operation)
+ }
+
+ switch df.FileFormat() {
+ case iceberg.ParquetFile, iceberg.OrcFile, iceberg.AvroFile:
+ default:
+ return nil, fmt.Errorf("data file %s has invalid file
format %s for %s", path, df.FileFormat(), operation)
+ }
+
+ if df.SpecID() != expectedSpecID {
+ return nil, fmt.Errorf("data file %s has invalid
partition spec id %d for %s: expected %d",
+ path, df.SpecID(), operation, expectedSpecID)
+ }
+
+ if err := validateDataFilePartitionData(df, currentSpec); err
!= nil {
+ return nil, fmt.Errorf("data file %s has invalid
partition data for %s: %w", path, operation, err)
+ }
+ }
+
+ return setToAdd, nil
+}
+
+// AddDataFiles adds pre-built DataFiles to the table without scanning them
from storage.
+// This is useful for clients who have already constructed DataFile objects
with metadata,
+// avoiding the need to read files to extract schema and statistics.
+//
+// Unlike AddFiles, this method does not read files from storage. It validates
only metadata
+// that can be checked without opening files (for example spec-id and
partition field IDs).
+//
+// Callers are responsible for ensuring each DataFile is valid and consistent
with the table.
+// Supplying incorrect DataFile metadata can produce an invalid snapshot and
break reads.
+func (t *Transaction) AddDataFiles(ctx context.Context, dataFiles
[]iceberg.DataFile, snapshotProps iceberg.Properties) error {
+ if len(dataFiles) == 0 {
+ return nil
+ }
+
+ setToAdd, err := t.validateDataFilesToAdd(dataFiles, "AddDataFiles")
+ if err != nil {
+ return err
+ }
+
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ if s := t.meta.currentSnapshot(); s != nil {
+ referenced := make([]string, 0)
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return err
+ }
+
+ if _, ok := setToAdd[df.FilePath()]; ok {
+ referenced = append(referenced, df.FilePath())
+ }
+ }
+
+ if len(referenced) > 0 {
+ return fmt.Errorf("cannot add files that are already
referenced by table, files: %s", referenced)
Review Comment:
`referenced` is a slice, not a string. `%v` would be better or converted to
a string with join
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]