agaddis02 commented on code in PR #723:
URL: https://github.com/apache/iceberg-go/pull/723#discussion_r2819499983
##########
table/transaction.go:
##########
@@ -435,6 +435,254 @@ func (t *Transaction) ReplaceDataFiles(ctx
context.Context, filesToDelete, files
return t.apply(updates, reqs)
}
+// validateDataFilePartitionData verifies that DataFile partition values match
+// the current partition spec fields by ID without reading file contents.
+func validateDataFilePartitionData(df iceberg.DataFile, spec
*iceberg.PartitionSpec) error {
+ partitionData := df.Partition()
+ if partitionData == nil {
+ partitionData = map[int]any{}
+ }
+
+ expectedFieldIDs := make(map[int]string)
+ for field := range spec.Fields() {
+ expectedFieldIDs[field.FieldID] = field.Name
+ if _, ok := partitionData[field.FieldID]; !ok {
+ return fmt.Errorf("missing partition value for field id
%d (%s)", field.FieldID, field.Name)
+ }
+ }
+
+ for fieldID := range partitionData {
+ if _, ok := expectedFieldIDs[fieldID]; !ok {
+ return fmt.Errorf("unknown partition field id %d for
spec id %d", fieldID, spec.ID())
+ }
+ }
+
+ return nil
+}
+
+// validateDataFilesToAdd performs metadata-only validation for caller-provided
+// DataFiles and returns a set of paths that passed validation.
+func (t *Transaction) validateDataFilesToAdd(dataFiles []iceberg.DataFile,
operation string) (map[string]struct{}, error) {
+ currentSpec, err := t.meta.CurrentSpec()
+ if err != nil || currentSpec == nil {
+ return nil, fmt.Errorf("could not get current partition spec:
%w", err)
+ }
+
+ expectedSpecID := int32(currentSpec.ID())
+ setToAdd := make(map[string]struct{}, len(dataFiles))
+
+ for i, df := range dataFiles {
+ if df == nil {
+ return nil, fmt.Errorf("nil data file at index %d for
%s", i, operation)
+ }
+
+ path := df.FilePath()
+ if path == "" {
+ return nil, fmt.Errorf("data file path cannot be empty
for %s", operation)
+ }
+
+ if _, ok := setToAdd[path]; ok {
+ return nil, fmt.Errorf("add data file paths must be
unique for %s", operation)
+ }
+ setToAdd[path] = struct{}{}
+
+ if df.ContentType() != iceberg.EntryContentData {
+ return nil, fmt.Errorf("data file %s has invalid
content type %s for %s", path, df.ContentType(), operation)
+ }
Review Comment:
No particular reason, If you wanted to support deleting files here as well I
think you'd want the function to be a bit more generic, I.E I wouldn't include
`add` in the name of the function if it also allowed for delete operations.
I could take the base of the function and make a generic function then have
an add or delete function that uses the base?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]