This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push: new 8df2e71 chore(manifests): Add test to verify that no avro schema cache is used with manifest files (#402) 8df2e71 is described below commit 8df2e71ec75f0208aaa217c9f09150a9826acd24 Author: Joshua Humphries <2035234+jh...@users.noreply.github.com> AuthorDate: Fri Apr 25 12:21:33 2025 -0400 chore(manifests): Add test to verify that no avro schema cache is used with manifest files (#402) This is a follow-up to #385, which added a similar test for manifest _lists_ (but not for manifest files). I verified the efficacy of this new test by commenting out the calls to `ocf.WithDecoderSchemaCache` in `readManifestEntries` and `NewManifestWriter` (which were added in #385). When they are commented out, the process uses avro's default global cache and the test then fails (as expected) with the following: ``` === RUN TestManifests/TestReadManifestIncompleteSchema manifest_test.go:911: Error Trace: /Users/jhumphries/src/iceberg-go/manifest_test.go:911 Error: An error is expected but got nil. Test: TestManifests/TestReadManifestIncompleteSchema --- FAIL: TestManifests/TestReadManifestIncompleteSchema (0.00s) ``` --- manifest.go | 4 ++ manifest_test.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/manifest.go b/manifest.go index 6b0f4a9..e09e892 100644 --- a/manifest.go +++ b/manifest.go @@ -450,6 +450,10 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]M } defer f.Close() + return readManifestEntries(m, f, discardDeleted) +} + +func readManifestEntries(m ManifestFile, f io.Reader, discardDeleted bool) ([]ManifestEntry, error) { dec, err := ocf.NewDecoder(f, ocf.WithDecoderSchemaCache(&avro.SchemaCache{})) if err != nil { return nil, err diff --git a/manifest_test.go b/manifest_test.go index d065ab7..aeda897 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -791,6 +791,126 @@ func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() { m.ErrorContains(err, "unknown type: field_summary") } +func (m *ManifestTestSuite) TestReadManifestIncompleteSchema() { + // This prevents a regression that could be caused by using a schema cache + // across multiple read/write operations of an avro file. While it may sound + // like a reasonable idea (caches speed things up, right?), it isn't that + // sort of cache: it's really a resolver to allow files with incomplete + // schemas, which we don't want. + + // If a schema cache *were* in use, this would populate it with a definition for + // the missing record type in the incomplete schema. So we'll first "warm up" + // any cache. (Note: if working correctly, this will have no such side effect.) + var buf bytes.Buffer + partitionSpec := NewPartitionSpecID(1) + snapshotID := int64(12345678) + seqNum := int64(9876) + dataFileBuilder, err := NewDataFileBuilder( + partitionSpec, + EntryContentData, + "s3://bucket/namespace/table/data/abcd-0123.parquet", + ParquetFile, + map[string]any{}, + 100, + 100*1000*1000, + ) + m.NoError(err) + file, err := WriteManifest( + "s3://bucket/namespace/table/metadata/abcd-0123.avro", &buf, 2, + partitionSpec, + NewSchema(123, + NestedField{ID: 1, Name: "id", Type: Int64Type{}}, + NestedField{ID: 2, Name: "name", Type: StringType{}}, + ), + snapshotID, + []ManifestEntry{NewManifestEntry( + EntryStatusADDED, + &snapshotID, + &seqNum, &seqNum, + dataFileBuilder.Build(), + )}, + ) + m.NoError(err) + + entries, err := readManifestEntries(file, &buf, false) + m.NoError(err) + m.Len(entries, 1) + + // This schema is that of a v2 manifest file, except that it refers to + // a type named "r2" for the "data_file" field, instead of actually + // including the definition of the "data_file" record type. + // This omission should result in an error. But if a schema cache were + // in use, this could get resolved based on a type of the same name read + // from a file that defined it. + incompleteSchema := ` + { + "name": "manifest_entry", + "type": "record", + "fields": [ + { + "name": "status", + "type": "int", + "field-id": 0 + }, + { + "name": "snapshot_id", + "type": [ + "null", + "long" + ], + "field-id": 1 + }, + { + "name": "sequence_number", + "type": [ + "null", + "long" + ], + "field-id": 3 + }, + { + "name": "file_sequence_number", + "type": [ + "null", + "long" + ], + "field-id": 4 + }, + { + "name": "data_file", + "type": "r2", + "field-id": 2 + } + ] + }` + + // We'll generate a file that is missing part of its schema + cache := &avro.SchemaCache{} + partitionSchema, err := avro.NewRecordSchema("r102", "", nil) // empty struct + m.NoError(err) + sch, err := internal.NewManifestEntrySchema(partitionSchema, 2) + m.NoError(err) + enc, err := ocf.NewEncoderWithSchema(sch, &buf, + ocf.WithEncoderSchemaCache(cache), + ocf.WithSchemaMarshaler(func(schema avro.Schema) ([]byte, error) { + return []byte(incompleteSchema), nil + }), + ocf.WithMetadata(map[string][]byte{ + "format-version": {'2'}, + // TODO: spec says other things are required, like schema and partition-spec info, + // but this package currently only looks at this one value when reading... + }), + ) + m.NoError(err) + for _, entry := range entries { + m.NoError(enc.Encode(entry)) + } + + // This should fail because the file's schema is incomplete. + _, err = readManifestEntries(file, &buf, false) + m.ErrorContains(err, "unknown type: r2") +} + func (m *ManifestTestSuite) TestManifestEntriesV2() { var mockfs internal.MockFS manifest := manifestFile{