This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new da9dd531 feat(views): add validations for json parsing of view
metadata (#619)
da9dd531 is described below
commit da9dd531ca4c4d60021d801c52187339509ec52e
Author: Tobias Pütz <[email protected]>
AuthorDate: Mon Nov 10 17:33:39 2025 +0100
feat(views): add validations for json parsing of view metadata (#619)
Adds validation & tests to the json deserialization of view metadata
---
view/view.go | 133 +++++++++++++++++-
view/view_internal_test.go | 334 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 466 insertions(+), 1 deletion(-)
diff --git a/view/view.go b/view/view.go
index 18db042a..9bf7b428 100644
--- a/view/view.go
+++ b/view/view.go
@@ -18,12 +18,21 @@
package view
import (
+ "encoding/json"
+ "errors"
+ "fmt"
"iter"
"slices"
+ "strings"
"github.com/apache/iceberg-go"
)
+var (
+ ErrInvalidViewMetadata = errors.New("invalid view
metadata")
+ ErrInvalidViewMetadataFormatVersion = errors.New("invalid or missing
format-version in view metadata")
+)
+
// Metadata defines the format for view metadata,
// similar to how Iceberg supports a common table format for tables
type Metadata interface {
@@ -73,7 +82,7 @@ func (m *metadata) Schemas() iter.Seq[*iceberg.Schema] {
}
func (m *metadata) CurrentVersion() *Version {
- for i := range m.VersionLogList {
+ for i := range m.VersionList {
if m.VersionList[i].VersionID == m.CurrentVersionId {
return &m.VersionList[i]
}
@@ -118,3 +127,125 @@ type VersionLogEntry struct {
TimestampMs int64 `json:"timestamp-ms"`
VersionID int64 `json:"version-id"`
}
+
+func (m *metadata) preValidate() {
+ if m.SchemaList == nil {
+ m.SchemaList = []*iceberg.Schema{}
+ }
+
+ if m.VersionList == nil {
+ m.VersionList = []Version{}
+ }
+
+ if m.VersionLogList == nil {
+ m.VersionLogList = []VersionLogEntry{}
+ }
+
+ if m.Props == nil {
+ m.Props = iceberg.Properties{}
+ }
+}
+
+func (m *metadata) validate() error {
+ if m.UUID == "" {
+ return fmt.Errorf("%w: view-uuid is required",
ErrInvalidViewMetadata)
+ }
+
+ if m.FmtVersion == -1 {
+ return fmt.Errorf("%w: format-version is required",
ErrInvalidViewMetadataFormatVersion)
+ }
+
+ if m.FmtVersion < 1 || m.FmtVersion > 1 {
+ return fmt.Errorf("%w: format-version %d (only version 1 is
supported)",
+ ErrInvalidViewMetadataFormatVersion, m.FmtVersion)
+ }
+
+ if m.Loc == "" {
+ return fmt.Errorf("%w: location is required",
ErrInvalidViewMetadata)
+ }
+
+ if len(m.VersionList) == 0 {
+ return fmt.Errorf("%w: at least one version is required",
ErrInvalidViewMetadata)
+ }
+
+ if m.CurrentVersionId == -1 {
+ return fmt.Errorf("%w: current-version-id is required",
ErrInvalidViewMetadata)
+ }
+
+ if len(m.SchemaList) == 0 {
+ return fmt.Errorf("%w: at least one schema is required",
ErrInvalidViewMetadata)
+ }
+
+ if err := m.checkCurrentVersionExists(); err != nil {
+ return err
+ }
+
+ if err := m.checkVersionSchemasExist(); err != nil {
+ return err
+ }
+
+ if err := m.checkDialectsUnique(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *metadata) checkCurrentVersionExists() error {
+ for _, v := range m.VersionList {
+ if v.VersionID == m.CurrentVersionId {
+ return nil
+ }
+ }
+
+ return fmt.Errorf("%w: current-version-id %d not found in versions",
+ ErrInvalidViewMetadata, m.CurrentVersionId)
+}
+
+func (m *metadata) checkVersionSchemasExist() error {
+ schemaIDs := make(map[int]bool)
+ for _, schema := range m.SchemaList {
+ schemaIDs[schema.ID] = true
+ }
+
+ for _, version := range m.VersionList {
+ if !schemaIDs[version.SchemaID] {
+ return fmt.Errorf("%w: version %d references unknown
schema-id %d",
+ ErrInvalidViewMetadata, version.VersionID,
version.SchemaID)
+ }
+ }
+
+ return nil
+}
+
+func (m *metadata) checkDialectsUnique() error {
+ for _, version := range m.VersionList {
+ seenDialects := make(map[string]bool)
+ for _, repr := range version.Representations {
+ dialect := strings.ToLower(repr.Dialect)
+ if seenDialects[dialect] {
+ return fmt.Errorf("%w: version %d has duplicate
dialect %s",
+ ErrInvalidViewMetadata,
version.VersionID, repr.Dialect)
+ }
+ seenDialects[dialect] = true
+ }
+ }
+
+ return nil
+}
+
+func (m *metadata) UnmarshalJSON(b []byte) error {
+ type Alias metadata
+ aux := (*Alias)(m)
+
+ aux.FmtVersion = -1
+ aux.CurrentVersionId = -1
+
+ if err := json.Unmarshal(b, aux); err != nil {
+ return err
+ }
+
+ m.preValidate()
+
+ return m.validate()
+}
diff --git a/view/view_internal_test.go b/view/view_internal_test.go
new file mode 100644
index 00000000..1a020102
--- /dev/null
+++ b/view/view_internal_test.go
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package view
+
+import (
+ "encoding/json"
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestValidMetadataDeserialization(t *testing.T) {
+ validJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "properties": {
+ "comment": "Daily event counts"
+ },
+ "versions": [{
+ "version-id": 1,
+ "timestamp-ms": 1573518431292,
+ "schema-id": 1,
+ "default-catalog": "prod",
+ "default-namespace": ["default"],
+ "summary": {
+ "operation": "create",
+ "engine-name": "Spark",
+ "engine-version": "3.3.2"
+ },
+ "representations": [{
+ "type": "sql",
+ "sql": "SELECT COUNT(*) FROM events",
+ "dialect": "spark"
+ }]
+ }],
+ "schemas": [{
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [{
+ "id": 1,
+ "name": "event_count",
+ "required": false,
+ "type": "long"
+ }]
+ }],
+ "version-log": [{
+ "timestamp-ms": 1573518431292,
+ "version-id": 1
+ }]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(validJSON), &meta)
+ require.NoError(t, err)
+
+ assert.Equal(t, "fa6506c3-7681-40c8-86dc-e36561f83385", meta.ViewUUID())
+ assert.Equal(t, 1, meta.FormatVersion())
+ assert.Equal(t, "s3://bucket/warehouse/default.db/event_agg",
meta.Location())
+ assert.Equal(t, int64(1), meta.CurrentVersionId)
+}
+
+func TestMissingViewUUID(t *testing.T) {
+ invalidJSON := `{
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "versions": [{"version-id": 1, "schema-id": 1, "timestamp-ms":
1234567890, "representations": [{"type": "sql", "sql": "SELECT 1", "dialect":
"spark"}]}],
+ "schemas": [{"schema-id": 1, "type": "struct", "fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890, "version-id": 1}]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(invalidJSON), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrInvalidViewMetadata))
+ assert.Contains(t, err.Error(), "view-uuid is required")
+}
+
+func TestMissingLocation(t *testing.T) {
+ invalidJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "current-version-id": 1,
+ "versions": [{"version-id": 1, "schema-id": 1, "timestamp-ms":
1234567890, "representations": [{"type": "sql", "sql": "SELECT 1", "dialect":
"spark"}]}],
+ "schemas": [{"schema-id": 1, "type": "struct", "fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890, "version-id": 1}]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(invalidJSON), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrInvalidViewMetadata))
+ assert.Contains(t, err.Error(), "location is required")
+}
+
+func TestMissingFormatVersion(t *testing.T) {
+ invalidJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "versions": [{"version-id": 1, "schema-id": 1, "timestamp-ms":
1234567890, "representations": [{"type": "sql", "sql": "SELECT 1", "dialect":
"spark"}]}],
+ "schemas": [{"schema-id": 1, "type": "struct", "fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890, "version-id": 1}]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(invalidJSON), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrInvalidViewMetadataFormatVersion))
+ assert.Contains(t, err.Error(), "format-version is required")
+}
+
+func TestInvalidFormatVersion(t *testing.T) {
+ testCases := []struct {
+ name string
+ json string
+ }{
+ {
+ "version 0",
+ `{
+ "view-uuid":
"fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 0,
+ "location":
"s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "versions": [{"version-id": 1, "schema-id": 1,
"timestamp-ms": 1234567890, "representations": [{"type": "sql", "sql": "SELECT
1", "dialect": "spark"}]}],
+ "schemas": [{"schema-id": 1, "type": "struct",
"fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890,
"version-id": 1}]
+ }`,
+ },
+ {
+ "version 2",
+ `{
+ "view-uuid":
"fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 2,
+ "location":
"s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "versions": [{"version-id": 1, "schema-id": 1,
"timestamp-ms": 1234567890, "representations": [{"type": "sql", "sql": "SELECT
1", "dialect": "spark"}]}],
+ "schemas": [{"schema-id": 1, "type": "struct",
"fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890,
"version-id": 1}]
+ }`,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ var meta metadata
+ err := json.Unmarshal([]byte(tc.json), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err,
ErrInvalidViewMetadataFormatVersion))
+ })
+ }
+}
+
+func TestMissingVersions(t *testing.T) {
+ invalidJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "versions": [],
+ "schemas": [{"schema-id": 1, "type": "struct", "fields": []}],
+ "version-log": []
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(invalidJSON), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrInvalidViewMetadata))
+ assert.Contains(t, err.Error(), "at least one version is required")
+}
+
+func TestCurrentVersionNotFound(t *testing.T) {
+ invalidJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 99,
+ "versions": [{"version-id": 1, "schema-id": 1, "timestamp-ms":
1234567890, "representations": [{"type": "sql", "sql": "SELECT 1", "dialect":
"spark"}]}],
+ "schemas": [{"schema-id": 1, "type": "struct", "fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890, "version-id": 1}]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(invalidJSON), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrInvalidViewMetadata))
+ assert.Contains(t, err.Error(), "current-version-id 99 not found")
+}
+
+func TestVersionReferencesUnknownSchema(t *testing.T) {
+ invalidJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "versions": [{"version-id": 1, "schema-id": 99, "timestamp-ms":
1234567890, "representations": [{"type": "sql", "sql": "SELECT 1", "dialect":
"spark"}]}],
+ "schemas": [{"schema-id": 1, "type": "struct", "fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890, "version-id": 1}]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(invalidJSON), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrInvalidViewMetadata))
+ assert.Contains(t, err.Error(), "version 1 references unknown schema-id
99")
+}
+
+func TestMissingSchemas(t *testing.T) {
+ invalidJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "versions": [{"version-id": 1, "schema-id": 1, "timestamp-ms":
1234567890, "representations": [{"type": "sql", "sql": "SELECT 1", "dialect":
"spark"}]}],
+ "schemas": [],
+ "version-log": [{"timestamp-ms": 1234567890, "version-id": 1}]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(invalidJSON), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrInvalidViewMetadata))
+ assert.Contains(t, err.Error(), "at least one schema is required")
+}
+
+func TestDuplicateDialects(t *testing.T) {
+ invalidJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "versions": [{
+ "version-id": 1,
+ "schema-id": 1,
+ "timestamp-ms": 1234567890,
+ "representations": [
+ {"type": "sql", "sql": "SELECT 1", "dialect":
"spark"},
+ {"type": "sql", "sql": "SELECT 2", "dialect":
"SPARK"}
+ ]
+ }],
+ "schemas": [{"schema-id": 1, "type": "struct", "fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890, "version-id": 1}]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(invalidJSON), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrInvalidViewMetadata))
+ assert.Contains(t, err.Error(), "duplicate dialect")
+}
+
+func TestNilFieldsInJSON(t *testing.T) {
+ validJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "versions": [{"version-id": 1, "schema-id": 1, "timestamp-ms":
1234567890, "representations": [{"type": "sql", "sql": "SELECT 1", "dialect":
"spark"}]}],
+ "schemas": [{"schema-id": 1, "type": "struct", "fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890, "version-id": 1}]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(validJSON), &meta)
+ require.NoError(t, err)
+
+ assert.NotNil(t, meta.Props)
+ assert.Empty(t, meta.Props)
+}
+
+func TestMissingCurrentVersionID(t *testing.T) {
+ invalidJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "versions": [{"version-id": 1, "schema-id": 1, "timestamp-ms":
1234567890, "representations": [{"type": "sql", "sql": "SELECT 1", "dialect":
"spark"}]}],
+ "schemas": [{"schema-id": 1, "type": "struct", "fields": []}],
+ "version-log": [{"timestamp-ms": 1234567890, "version-id": 1}]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(invalidJSON), &meta)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrInvalidViewMetadata))
+ assert.Contains(t, err.Error(), "current-version-id is required")
+}
+
+func TestMultipleVersionsValidation(t *testing.T) {
+ validJSON := `{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 2,
+ "versions": [
+ {"version-id": 1, "schema-id": 1, "timestamp-ms":
1234567890, "representations": [{"type": "sql", "sql": "SELECT 1", "dialect":
"spark"}]},
+ {"version-id": 2, "schema-id": 2, "timestamp-ms":
1234567900, "representations": [{"type": "sql", "sql": "SELECT 2", "dialect":
"trino"}]}
+ ],
+ "schemas": [
+ {"schema-id": 1, "type": "struct", "fields": [{"id": 1,
"name": "x", "required": false, "type": "long"}]},
+ {"schema-id": 2, "type": "struct", "fields": [{"id": 1,
"name": "y", "required": false, "type": "string"}]}
+ ],
+ "version-log": [
+ {"timestamp-ms": 1234567890, "version-id": 1},
+ {"timestamp-ms": 1234567900, "version-id": 2}
+ ]
+ }`
+
+ var meta metadata
+ err := json.Unmarshal([]byte(validJSON), &meta)
+ require.NoError(t, err)
+
+ assert.Len(t, meta.VersionList, 2)
+ assert.Len(t, meta.SchemaList, 2)
+ assert.Len(t, meta.VersionLogList, 2)
+}