This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new ea71202 feat(manifest): fix partition data map (#124)
ea71202 is described below
commit ea71202437b661a91746049224fecbfee5caf4ef
Author: Matt Topol <[email protected]>
AuthorDate: Mon Aug 19 10:45:28 2024 -0400
feat(manifest): fix partition data map (#124)
---
io/local.go | 7 +++--
manifest.go | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
2 files changed, 103 insertions(+), 7 deletions(-)
diff --git a/io/local.go b/io/local.go
index befa831..560d9be 100644
--- a/io/local.go
+++ b/io/local.go
@@ -17,14 +17,17 @@
package io
-import "os"
+import (
+ "os"
+ "strings"
+)
// LocalFS is an implementation of IO that implements interaction with
// the local file system.
type LocalFS struct{}
func (LocalFS) Open(name string) (File, error) {
- return os.Open(name)
+ return os.Open(strings.TrimPrefix(name, "file://"))
}
func (LocalFS) Remove(name string) error {
diff --git a/manifest.go b/manifest.go
index 3f4b8ca..5340aa5 100644
--- a/manifest.go
+++ b/manifest.go
@@ -20,6 +20,7 @@ package iceberg
import (
"io"
"sync"
+ "time"
iceio "github.com/apache/iceberg-go/io"
@@ -362,6 +363,28 @@ func (m *manifestFileV2) FetchEntries(fs iceio.IO,
discardDeleted bool) ([]Manif
return fetchManifestEntries(m, fs, discardDeleted)
}
+func getFieldIDMap(sc avro.Schema) map[string]int {
+ getField := func(rs *avro.RecordSchema, name string) *avro.Field {
+ for _, f := range rs.Fields() {
+ if f.Name() == name {
+ return f
+ }
+ }
+ return nil
+ }
+
+ result := make(map[string]int)
+ entryField := getField(sc.(*avro.RecordSchema), "data_file")
+ partitionField := getField(entryField.Type().(*avro.RecordSchema),
"partition")
+
+ for _, field := range
partitionField.Type().(*avro.RecordSchema).Fields() {
+ if fid, ok := field.Prop("field-id").(float64); ok {
+ result[field.Name()] = int(fid)
+ }
+ }
+ return result
+}
+
func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool)
([]ManifestEntry, error) {
f, err := fs.Open(m.FilePath())
if err != nil {
@@ -375,15 +398,16 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO,
discardDeleted bool) ([]M
}
metadata := dec.Metadata()
+ sc, err := avro.ParseBytes(dec.Metadata()["avro.schema"])
+ if err != nil {
+ return nil, err
+ }
+
+ fieldNameToID := getFieldIDMap(sc)
isVer1, isFallback := true, false
if string(metadata["format-version"]) == "2" {
isVer1 = false
} else {
- sc, err := avro.ParseBytes(dec.Metadata()["avro.schema"])
- if err != nil {
- return nil, err
- }
-
for _, f := range sc.(*avro.RecordSchema).Fields() {
if f.Name() == "snapshot_id" {
if f.Type().Type() == avro.Union {
@@ -417,6 +441,7 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO,
discardDeleted bool) ([]M
if !discardDeleted || tmp.Status() != EntryStatusDELETED {
tmp.inheritSeqNum(m)
+ tmp.DataFile().setFieldNameToIDMap(fieldNameToID)
results = append(results, tmp)
}
}
@@ -557,6 +582,19 @@ const (
EntryContentEqDeletes ManifestEntryContent = 2
)
+func (m ManifestEntryContent) String() string {
+ switch m {
+ case EntryContentData:
+ return "Data"
+ case EntryContentPosDeletes:
+ return "Positional_Deletes"
+ case EntryContentEqDeletes:
+ return "Equality_Deletes"
+ default:
+ return "UNKNOWN"
+ }
+}
+
// FileFormat defines constants for the format of data files.
type FileFormat string
@@ -583,6 +621,51 @@ func avroColMapToMap[K comparable, V any](c *[]colMap[K,
V]) map[K]V {
return out
}
+func avroPartitionData(input map[string]any) map[string]any {
+ // hambra/avro/v2 will unmarshal a map[string]any such that
+ // each entry will actually be a map[string]any with the key being
+ // the avro type, not the field name.
+ //
+ // This means that partition data that looks like this:
+ //
+ // [{"field-id": 1000, "name": "ts", "type": {"type": "int",
"logicalType": "date"}}]
+ //
+ // Becomes:
+ //
+ // map[string]any{"ts": map[string]any{"int.date": time.Time{}}}
+ //
+ // so we need to simplify our map and make the partition data handling
easier
+ out := make(map[string]any)
+ for k, v := range input {
+ switch v := v.(type) {
+ case map[string]any:
+ for typeName, val := range v {
+ switch typeName {
+ case "int.date":
+ out[k] =
Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour *
24).Seconds()))
+ case "int.time-millis":
+ out[k] =
Time(val.(time.Duration).Microseconds())
+ case "long.time-micros":
+ out[k] =
Time(val.(time.Duration).Microseconds())
+ case "long.timestamp-millis":
+ out[k] =
Timestamp(val.(time.Time).UTC().UnixMicro())
+ case "long.timestamp-micros":
+ out[k] =
Timestamp(val.(time.Time).UTC().UnixMicro())
+ case "bytes.decimal":
+ // not implemented yet
+ case "fixed.decimal":
+ // not implemented yet
+ default:
+ out[k] = val
+ }
+ }
+ default:
+ out[k] = v
+ }
+ }
+ return out
+}
+
type dataFile struct {
Content ManifestEntryContent `avro:"content"`
Path string `avro:"file_path"`
@@ -611,6 +694,11 @@ type dataFile struct {
lowerBoundMap map[int][]byte
upperBoundMap map[int][]byte
+ // not used for anything yet, but important to maintain the information
+ // for future development and updates such as when we get to writes,
+ // and scan planning
+ fieldNameToID map[string]int
+
initMaps sync.Once
}
@@ -623,9 +711,12 @@ func (d *dataFile) initializeMapData() {
d.distinctCntMap = avroColMapToMap(d.DistinctCounts)
d.lowerBoundMap = avroColMapToMap(d.LowerBounds)
d.upperBoundMap = avroColMapToMap(d.UpperBounds)
+ d.PartitionData = avroPartitionData(d.PartitionData)
})
}
+func (d *dataFile) setFieldNameToIDMap(m map[string]int) { d.fieldNameToID = m
}
+
func (d *dataFile) ContentType() ManifestEntryContent { return d.Content }
func (d *dataFile) FilePath() string { return d.Path }
func (d *dataFile) FileFormat() FileFormat { return d.Format }
@@ -831,6 +922,8 @@ type DataFile interface {
// SortOrderID returns the id representing the sort order for this
// file, or nil if there is no sort order.
SortOrderID() *int
+
+ setFieldNameToIDMap(map[string]int)
}
// ManifestEntry is an interface for both v1 and v2 manifest entries.