This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new ea71202  feat(manifest): fix partition data map (#124)
ea71202 is described below

commit ea71202437b661a91746049224fecbfee5caf4ef
Author: Matt Topol <[email protected]>
AuthorDate: Mon Aug 19 10:45:28 2024 -0400

    feat(manifest): fix partition data map (#124)
---
 io/local.go |   7 +++--
 manifest.go | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 2 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/io/local.go b/io/local.go
index befa831..560d9be 100644
--- a/io/local.go
+++ b/io/local.go
@@ -17,14 +17,17 @@
 
 package io
 
-import "os"
+import (
+       "os"
+       "strings"
+)
 
 // LocalFS is an implementation of IO that implements interaction with
 // the local file system.
 type LocalFS struct{}
 
 func (LocalFS) Open(name string) (File, error) {
-       return os.Open(name)
+       return os.Open(strings.TrimPrefix(name, "file://"))
 }
 
 func (LocalFS) Remove(name string) error {
diff --git a/manifest.go b/manifest.go
index 3f4b8ca..5340aa5 100644
--- a/manifest.go
+++ b/manifest.go
@@ -20,6 +20,7 @@ package iceberg
 import (
        "io"
        "sync"
+       "time"
 
        iceio "github.com/apache/iceberg-go/io"
 
@@ -362,6 +363,28 @@ func (m *manifestFileV2) FetchEntries(fs iceio.IO, 
discardDeleted bool) ([]Manif
        return fetchManifestEntries(m, fs, discardDeleted)
 }
 
+func getFieldIDMap(sc avro.Schema) map[string]int {
+       getField := func(rs *avro.RecordSchema, name string) *avro.Field {
+               for _, f := range rs.Fields() {
+                       if f.Name() == name {
+                               return f
+                       }
+               }
+               return nil
+       }
+
+       result := make(map[string]int)
+       entryField := getField(sc.(*avro.RecordSchema), "data_file")
+       partitionField := getField(entryField.Type().(*avro.RecordSchema), 
"partition")
+
+       for _, field := range 
partitionField.Type().(*avro.RecordSchema).Fields() {
+               if fid, ok := field.Prop("field-id").(float64); ok {
+                       result[field.Name()] = int(fid)
+               }
+       }
+       return result
+}
+
 func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) 
([]ManifestEntry, error) {
        f, err := fs.Open(m.FilePath())
        if err != nil {
@@ -375,15 +398,16 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO, 
discardDeleted bool) ([]M
        }
 
        metadata := dec.Metadata()
+       sc, err := avro.ParseBytes(dec.Metadata()["avro.schema"])
+       if err != nil {
+               return nil, err
+       }
+
+       fieldNameToID := getFieldIDMap(sc)
        isVer1, isFallback := true, false
        if string(metadata["format-version"]) == "2" {
                isVer1 = false
        } else {
-               sc, err := avro.ParseBytes(dec.Metadata()["avro.schema"])
-               if err != nil {
-                       return nil, err
-               }
-
                for _, f := range sc.(*avro.RecordSchema).Fields() {
                        if f.Name() == "snapshot_id" {
                                if f.Type().Type() == avro.Union {
@@ -417,6 +441,7 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO, 
discardDeleted bool) ([]M
 
                if !discardDeleted || tmp.Status() != EntryStatusDELETED {
                        tmp.inheritSeqNum(m)
+                       tmp.DataFile().setFieldNameToIDMap(fieldNameToID)
                        results = append(results, tmp)
                }
        }
@@ -557,6 +582,19 @@ const (
        EntryContentEqDeletes  ManifestEntryContent = 2
 )
 
+func (m ManifestEntryContent) String() string {
+       switch m {
+       case EntryContentData:
+               return "Data"
+       case EntryContentPosDeletes:
+               return "Positional_Deletes"
+       case EntryContentEqDeletes:
+               return "Equality_Deletes"
+       default:
+               return "UNKNOWN"
+       }
+}
+
 // FileFormat defines constants for the format of data files.
 type FileFormat string
 
@@ -583,6 +621,51 @@ func avroColMapToMap[K comparable, V any](c *[]colMap[K, 
V]) map[K]V {
        return out
 }
 
+func avroPartitionData(input map[string]any) map[string]any {
+       // hambra/avro/v2 will unmarshal a map[string]any such that
+       // each entry will actually be a map[string]any with the key being
+       // the avro type, not the field name.
+       //
+       // This means that partition data that looks like this:
+       //
+       //  [{"field-id": 1000, "name": "ts", "type": {"type": "int", 
"logicalType": "date"}}]
+       //
+       // Becomes:
+       //
+       //  map[string]any{"ts": map[string]any{"int.date": time.Time{}}}
+       //
+       // so we need to simplify our map and make the partition data handling 
easier
+       out := make(map[string]any)
+       for k, v := range input {
+               switch v := v.(type) {
+               case map[string]any:
+                       for typeName, val := range v {
+                               switch typeName {
+                               case "int.date":
+                                       out[k] = 
Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
+                               case "int.time-millis":
+                                       out[k] = 
Time(val.(time.Duration).Microseconds())
+                               case "long.time-micros":
+                                       out[k] = 
Time(val.(time.Duration).Microseconds())
+                               case "long.timestamp-millis":
+                                       out[k] = 
Timestamp(val.(time.Time).UTC().UnixMicro())
+                               case "long.timestamp-micros":
+                                       out[k] = 
Timestamp(val.(time.Time).UTC().UnixMicro())
+                               case "bytes.decimal":
+                                       // not implemented yet
+                               case "fixed.decimal":
+                                       // not implemented yet
+                               default:
+                                       out[k] = val
+                               }
+                       }
+               default:
+                       out[k] = v
+               }
+       }
+       return out
+}
+
 type dataFile struct {
        Content          ManifestEntryContent   `avro:"content"`
        Path             string                 `avro:"file_path"`
@@ -611,6 +694,11 @@ type dataFile struct {
        lowerBoundMap  map[int][]byte
        upperBoundMap  map[int][]byte
 
+       // not used for anything yet, but important to maintain the information
+       // for future development and updates such as when we get to writes,
+       // and scan planning
+       fieldNameToID map[string]int
+
        initMaps sync.Once
 }
 
@@ -623,9 +711,12 @@ func (d *dataFile) initializeMapData() {
                d.distinctCntMap = avroColMapToMap(d.DistinctCounts)
                d.lowerBoundMap = avroColMapToMap(d.LowerBounds)
                d.upperBoundMap = avroColMapToMap(d.UpperBounds)
+               d.PartitionData = avroPartitionData(d.PartitionData)
        })
 }
 
+func (d *dataFile) setFieldNameToIDMap(m map[string]int) { d.fieldNameToID = m 
}
+
 func (d *dataFile) ContentType() ManifestEntryContent { return d.Content }
 func (d *dataFile) FilePath() string                  { return d.Path }
 func (d *dataFile) FileFormat() FileFormat            { return d.Format }
@@ -831,6 +922,8 @@ type DataFile interface {
        // SortOrderID returns the id representing the sort order for this
        // file, or nil if there is no sort order.
        SortOrderID() *int
+
+       setFieldNameToIDMap(map[string]int)
 }
 
 // ManifestEntry is an interface for both v1 and v2 manifest entries.

Reply via email to