This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 3048d772ae Flink: Dynamic Sink: Fix serialization issues with schemas 
larger than 2^16 bytes (#14880)
3048d772ae is described below

commit 3048d772aed6572bce28abbddae661d441058f74
Author: Maximilian Michels <[email protected]>
AuthorDate: Mon Jan 5 14:22:39 2026 +0100

    Flink: Dynamic Sink: Fix serialization issues with schemas larger than 2^16 
bytes (#14880)
---
 .../dynamic/DynamicRecordInternalSerializer.java   | 103 +++++++++++++++---
 .../flink/source/split/IcebergSourceSplit.java     |   1 +
 .../{source/split => util}/SerializerHelper.java   |  15 +--
 .../DynamicRecordInternalSerializerTestBase.java   |   8 +-
 .../TestDynamicRecordInternalSerializer.java       | 119 +++++++++++++++++++++
 ...DynamicRecordInternalSerializerWriteSchema.java |   4 +-
 ...namicRecordInternalSerializerWriteSchemaId.java |   4 +-
 ...ordInternalSerializerWriteSchemaIdLongUTF.java} |   8 +-
 ...ecordInternalSerializerWriteSchemaLongUTF.java} |   8 +-
 9 files changed, 236 insertions(+), 34 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
index 22b7742913..0d758ace1b 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Set;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -34,6 +35,9 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.flink.util.SerializerHelper;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 
 @Internal
@@ -43,18 +47,27 @@ class DynamicRecordInternalSerializer extends 
TypeSerializer<DynamicRecordIntern
 
   private final TableSerializerCache serializerCache;
   private final boolean writeSchemaAndSpec;
+  private final boolean writeLongUTF;
 
   DynamicRecordInternalSerializer(
       TableSerializerCache serializerCache, boolean writeSchemaAndSpec) {
+    this(serializerCache, writeSchemaAndSpec, true);
+  }
+
+  @VisibleForTesting
+  DynamicRecordInternalSerializer(
+      TableSerializerCache serializerCache, boolean writeSchemaAndSpec, 
boolean writeLongUTF) {
     this.serializerCache = serializerCache;
     this.writeSchemaAndSpec = writeSchemaAndSpec;
+    this.writeLongUTF = writeLongUTF;
   }
 
   @Override
   public TypeSerializer<DynamicRecordInternal> duplicate() {
     return new DynamicRecordInternalSerializer(
         new TableSerializerCache(serializerCache.catalogLoader(), 
serializerCache.maximumSize()),
-        writeSchemaAndSpec);
+        writeSchemaAndSpec,
+        writeLongUTF);
   }
 
   @Override
@@ -68,7 +81,12 @@ class DynamicRecordInternalSerializer extends 
TypeSerializer<DynamicRecordIntern
     dataOutputView.writeUTF(toSerialize.tableName());
     dataOutputView.writeUTF(toSerialize.branch());
     if (writeSchemaAndSpec) {
-      dataOutputView.writeUTF(SchemaParser.toJson(toSerialize.schema()));
+      if (writeLongUTF) {
+        SerializerHelper.writeLongUTF(dataOutputView, 
SchemaParser.toJson(toSerialize.schema()));
+      } else {
+        dataOutputView.writeUTF(SchemaParser.toJson(toSerialize.schema()));
+      }
+
       dataOutputView.writeUTF(PartitionSpecParser.toJson(toSerialize.spec()));
     } else {
       dataOutputView.writeInt(toSerialize.schema().schemaId());
@@ -108,7 +126,12 @@ class DynamicRecordInternalSerializer extends 
TypeSerializer<DynamicRecordIntern
     final PartitionSpec spec;
     final RowDataSerializer rowDataSerializer;
     if (writeSchemaAndSpec) {
-      schema = SchemaParser.fromJson(dataInputView.readUTF());
+      if (writeLongUTF) {
+        schema = 
SchemaParser.fromJson(SerializerHelper.readLongUTF(dataInputView));
+      } else {
+        schema = SchemaParser.fromJson(dataInputView.readUTF());
+      }
+
       spec = PartitionSpecParser.fromJson(schema, dataInputView.readUTF());
       rowDataSerializer = serializerCache.serializer(tableName, schema, spec);
     } else {
@@ -152,7 +175,12 @@ class DynamicRecordInternalSerializer extends 
TypeSerializer<DynamicRecordIntern
     final PartitionSpec spec;
     final RowDataSerializer rowDataSerializer;
     if (writeSchemaAndSpec) {
-      schema = SchemaParser.fromJson(dataInputView.readUTF());
+      if (writeLongUTF) {
+        schema = 
SchemaParser.fromJson(SerializerHelper.readLongUTF(dataInputView));
+      } else {
+        schema = SchemaParser.fromJson(dataInputView.readUTF());
+      }
+
       spec = PartitionSpecParser.fromJson(schema, dataInputView.readUTF());
       reuse.setSchema(schema);
       reuse.setSpec(spec);
@@ -245,25 +273,32 @@ class DynamicRecordInternalSerializer extends 
TypeSerializer<DynamicRecordIntern
 
   @Override
   public TypeSerializerSnapshot<DynamicRecordInternal> snapshotConfiguration() 
{
-    return new DynamicRecordInternalTypeSerializerSnapshot(writeSchemaAndSpec);
+    return new DynamicRecordInternalTypeSerializerSnapshot(writeSchemaAndSpec, 
serializerCache);
   }
 
   public static class DynamicRecordInternalTypeSerializerSnapshot
       implements TypeSerializerSnapshot<DynamicRecordInternal> {
 
+    private static final int MOST_RECENT_VERSION = 1;
+
     private boolean writeSchemaAndSpec;
+    private int version;
+    private TableSerializerCache serializerCache;
 
-    // Zero args constructor is required to instantiate this class on restore
+    // Zero args constructor is required to instantiate this class on restore 
via readSnapshot(..)
     @SuppressWarnings({"unused", "checkstyle:RedundantModifier"})
     public DynamicRecordInternalTypeSerializerSnapshot() {}
 
-    DynamicRecordInternalTypeSerializerSnapshot(boolean writeSchemaAndSpec) {
+    DynamicRecordInternalTypeSerializerSnapshot(
+        boolean writeSchemaAndSpec, TableSerializerCache serializerCache) {
       this.writeSchemaAndSpec = writeSchemaAndSpec;
+      this.serializerCache = serializerCache;
+      this.version = MOST_RECENT_VERSION;
     }
 
     @Override
     public int getCurrentVersion() {
-      return 0;
+      return version;
     }
 
     @Override
@@ -274,22 +309,62 @@ class DynamicRecordInternalSerializer extends 
TypeSerializer<DynamicRecordIntern
     @Override
     public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
         throws IOException {
+      this.version = readVersion;
       this.writeSchemaAndSpec = in.readBoolean();
     }
 
     @Override
     public TypeSerializerSchemaCompatibility<DynamicRecordInternal> 
resolveSchemaCompatibility(
         TypeSerializerSnapshot<DynamicRecordInternal> oldSerializerSnapshot) {
-      return TypeSerializerSchemaCompatibility.compatibleAsIs();
+      if (oldSerializerSnapshot.getCurrentVersion() == getCurrentVersion()) {
+        return TypeSerializerSchemaCompatibility.compatibleAsIs();
+      }
+
+      // Old TypeSerializerSnapshots do not contain the serializer cache, but 
the newest one does.
+      // This will also ensure that we always use the up-to-date cache 
alongside with its catalog
+      // configuration.
+      Preconditions.checkNotNull(serializerCache, "serializerCache should not 
be null");
+      try {
+        DynMethods.builder("initializeSerializerCache")
+            .hiddenImpl(
+                DynamicRecordInternalTypeSerializerSnapshot.class, 
TableSerializerCache.class)
+            .build()
+            .invoke(oldSerializerSnapshot, serializerCache);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Failed to initialize serializerCache for reading data with old 
serializer", e);
+      }
+
+      // This will first read data with the old serializer, then switch to the 
most recent one.
+      return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
     }
 
     @Override
     public TypeSerializer<DynamicRecordInternal> restoreSerializer() {
-      // Note: We pass in a null serializer cache which would create issues if 
we tried to use this
-      // restored serializer, but since we are using {@code
-      // TypeSerializerSchemaCompatibility.compatibleAsIs()} above, this 
serializer will never be
-      // used. A new one will be created via {@code DynamicRecordInternalType}.
-      return new DynamicRecordInternalSerializer(null, writeSchemaAndSpec);
+      if (getCurrentVersion() < MOST_RECENT_VERSION) {
+        // If this serializer is not the most recent one, we need to read old 
data with the correct
+        // parameters.
+        return new DynamicRecordInternalSerializer(serializerCache, 
writeSchemaAndSpec, false);
+      }
+
+      // In all other cases, we just use the newest serializer.
+      return new DynamicRecordInternalSerializer(serializerCache, 
writeSchemaAndSpec, true);
+    }
+
+    /**
+     * We need to lazily initialize the cache from the up-to-date serializer 
which has the current
+     * CatalogLoader available.
+     *
+     * <p>This method must not be removed!
+     */
+    @SuppressWarnings("unused")
+    private void initializeSerializerCache(TableSerializerCache cache) {
+      this.serializerCache = cache;
     }
   }
+
+  @VisibleForTesting
+  TableSerializerCache getSerializerCache() {
+    return serializerCache;
+  }
 }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index b6d6f60ef6..0cc1c633f9 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ScanTaskParser;
+import org.apache.iceberg.flink.util.SerializerHelper;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/SerializerHelper.java
similarity index 92%
rename from 
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
rename to 
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/SerializerHelper.java
index 841969666e..3a161ea263 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/SerializerHelper.java
@@ -16,20 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.flink.source.split;
+package org.apache.iceberg.flink.util;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.UTFDataFormatException;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
 
 /**
  * Helper class to serialize and deserialize strings longer than 65K. The 
inspiration is mostly
  * taken from the class 
org.apache.flink.core.memory.DataInputSerializer.readUTF and
  * org.apache.flink.core.memory.DataOutputSerializer.writeUTF.
  */
-class SerializerHelper implements Serializable {
+@Internal
+public class SerializerHelper implements Serializable {
 
   private SerializerHelper() {}
 
@@ -47,7 +51,7 @@ class SerializerHelper implements Serializable {
    * @param out the output stream to write the string to.
    * @param str the string value to be written.
    */
-  public static void writeLongUTF(DataOutputSerializer out, String str) throws 
IOException {
+  public static void writeLongUTF(DataOutputView out, String str) throws 
IOException {
     int strlen = str.length();
     long utflen = 0;
     int ch;
@@ -85,7 +89,7 @@ class SerializerHelper implements Serializable {
    * @return the string value read from the input stream.
    * @throws IOException if an I/O error occurs when reading from the input 
stream.
    */
-  public static String readLongUTF(DataInputDeserializer in) throws 
IOException {
+  public static String readLongUTF(DataInputView in) throws IOException {
     int utflen = in.readInt();
     byte[] bytearr = new byte[utflen];
     char[] chararr = new char[utflen];
@@ -168,8 +172,7 @@ class SerializerHelper implements Serializable {
     }
   }
 
-  private static void writeUTFBytes(DataOutputSerializer out, String str, int 
utflen)
-      throws IOException {
+  private static void writeUTFBytes(DataOutputView out, String str, int 
utflen) throws IOException {
     int strlen = str.length();
     int ch;
 
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
index 30782e8d41..07096b891a 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
@@ -55,15 +55,19 @@ abstract class DynamicRecordInternalSerializerTestBase
   static final PartitionSpec SPEC = 
PartitionSpec.builderFor(SCHEMA).bucket("id", 10).build();
 
   private boolean writeFullSchemaAndSpec;
+  private final boolean writeLongUTF;
 
-  DynamicRecordInternalSerializerTestBase(boolean writeFullSchemaAndSpec) {
+  DynamicRecordInternalSerializerTestBase(boolean writeFullSchemaAndSpec, 
boolean writeLongUTF) {
     this.writeFullSchemaAndSpec = writeFullSchemaAndSpec;
+    this.writeLongUTF = writeLongUTF;
   }
 
   @Override
   protected TypeSerializer<DynamicRecordInternal> createSerializer() {
     return new DynamicRecordInternalSerializer(
-        new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1), 
writeFullSchemaAndSpec);
+        new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1),
+        writeFullSchemaAndSpec,
+        writeLongUTF);
   }
 
   @BeforeEach
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializer.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializer.java
new file mode 100644
index 0000000000..388ff30333
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class TestDynamicRecordInternalSerializer {
+
+  @RegisterExtension
+  static final HadoopCatalogExtension CATALOG_EXTENSION = new 
HadoopCatalogExtension("db", "table");
+
+  @Test
+  void testCurrentTypeSerializerSnapshotVersion() {
+    TypeSerializer<DynamicRecordInternal> serializer = createSerializer();
+    
assertThat(serializer).isNotNull().isInstanceOf(DynamicRecordInternalSerializer.class);
+    TypeSerializerSnapshot<DynamicRecordInternal> snapshot = 
serializer.snapshotConfiguration();
+    assertThat(snapshot.getCurrentVersion()).isEqualTo(1);
+  }
+
+  @Test
+  void testCurrentTypeSerializerSnapshotCompatibility() {
+    TypeSerializer<DynamicRecordInternal> serializer = createSerializer();
+    
assertThat(serializer).isNotNull().isInstanceOf(DynamicRecordInternalSerializer.class);
+    TypeSerializerSnapshot<DynamicRecordInternal> snapshot = 
serializer.snapshotConfiguration();
+    assertThat(
+            snapshot
+                .resolveSchemaCompatibility(serializer.snapshotConfiguration())
+                .isCompatibleAsIs())
+        .isTrue();
+  }
+
+  @Test
+  void testRestoreFromOldVersion() throws IOException {
+    // Create a serialized snapshot of the TypeSerializer
+    final int oldVersion = 0;
+    OldTypeSerializerSnapshot oldTypeSerializerSnapshot = new 
OldTypeSerializerSnapshot(oldVersion);
+    
assertThat(oldTypeSerializerSnapshot.getCurrentVersion()).isEqualTo(oldVersion);
+    DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
+    oldTypeSerializerSnapshot.writeSnapshot(dataOutputSerializer);
+
+    // Load the serialized state
+    
DynamicRecordInternalSerializer.DynamicRecordInternalTypeSerializerSnapshot 
restoreSnapshot =
+        
(DynamicRecordInternalSerializer.DynamicRecordInternalTypeSerializerSnapshot)
+            createSerializer().snapshotConfiguration();
+    restoreSnapshot.readSnapshot(
+        oldVersion,
+        new DataInputDeserializer(dataOutputSerializer.getSharedBuffer()),
+        getClass().getClassLoader());
+    // Check that it matches the original one
+    assertThat(restoreSnapshot.getCurrentVersion()).isEqualTo(oldVersion);
+    assertThat(
+            restoreSnapshot
+                .resolveSchemaCompatibility(oldTypeSerializerSnapshot)
+                .isCompatibleAsIs())
+        .isTrue();
+    TypeSerializer<DynamicRecordInternal> restoreSerializer = 
restoreSnapshot.restoreSerializer();
+    
assertThat(restoreSerializer).isInstanceOf(DynamicRecordInternalSerializer.class);
+    assertThat(((DynamicRecordInternalSerializer) 
restoreSerializer).getSerializerCache())
+        .isNotNull();
+
+    // Compare against the latest version of a snapshot
+    TypeSerializerSnapshot<DynamicRecordInternal> latestVersion =
+        createSerializer().snapshotConfiguration();
+    assertThat(latestVersion.getCurrentVersion()).isEqualTo(1);
+    assertThat(
+            latestVersion
+                .resolveSchemaCompatibility(oldTypeSerializerSnapshot)
+                .isCompatibleAfterMigration())
+        .isTrue();
+    assertThat(
+            
latestVersion.resolveSchemaCompatibility(restoreSnapshot).isCompatibleAfterMigration())
+        .isTrue();
+  }
+
+  private DynamicRecordInternalSerializer createSerializer() {
+    return new DynamicRecordInternalSerializer(
+        new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1), true);
+  }
+
+  private static class OldTypeSerializerSnapshot
+      extends 
DynamicRecordInternalSerializer.DynamicRecordInternalTypeSerializerSnapshot {
+
+    private final int version;
+
+    OldTypeSerializerSnapshot(int version) {
+      this.version = version;
+    }
+
+    @Override
+    public int getCurrentVersion() {
+      return version;
+    }
+  }
+}
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
index ab8ce98c35..e7aa2d24d9 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
@@ -18,11 +18,11 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
-/** Test writing DynamicRecord with the full schema */
+/** Test writing DynamicRecord with the full schema and standard UTF encoding 
*/
 class TestDynamicRecordInternalSerializerWriteSchema
     extends DynamicRecordInternalSerializerTestBase {
 
   TestDynamicRecordInternalSerializerWriteSchema() {
-    super(true);
+    super(true /* writeFullSchemaAndSpec */, false /* writeLongUTF */);
   }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java
index 1d88905462..bff0fd5c6a 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java
@@ -18,11 +18,11 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
-/** Test writing DynamicRecord with only the schema id. */
+/** Test writing DynamicRecord with only the schema id and standard UTF 
encoding */
 class TestDynamicRecordInternalSerializerWriteSchemaId
     extends DynamicRecordInternalSerializerTestBase {
 
   TestDynamicRecordInternalSerializerWriteSchemaId() {
-    super(false);
+    super(false /* writeFullSchemaAndSpec */, false /* writeLongUTF */);
   }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF.java
similarity index 76%
copy from 
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
copy to 
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF.java
index ab8ce98c35..7a1ae3df38 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF.java
@@ -18,11 +18,11 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
-/** Test writing DynamicRecord with the full schema */
-class TestDynamicRecordInternalSerializerWriteSchema
+/** Test writing DynamicRecord with only the schema id and long UTF encoding */
+class TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF
     extends DynamicRecordInternalSerializerTestBase {
 
-  TestDynamicRecordInternalSerializerWriteSchema() {
-    super(true);
+  TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF() {
+    super(false /* writeFullSchemaAndSpec */, true /* writeLongUTF */);
   }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaLongUTF.java
similarity index 77%
copy from 
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
copy to 
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaLongUTF.java
index ab8ce98c35..faff8921db 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaLongUTF.java
@@ -18,11 +18,11 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
-/** Test writing DynamicRecord with the full schema */
-class TestDynamicRecordInternalSerializerWriteSchema
+/** Test writing DynamicRecord with the full schema and long UTF encoding */
+class TestDynamicRecordInternalSerializerWriteSchemaLongUTF
     extends DynamicRecordInternalSerializerTestBase {
 
-  TestDynamicRecordInternalSerializerWriteSchema() {
-    super(true);
+  TestDynamicRecordInternalSerializerWriteSchemaLongUTF() {
+    super(true /* writeFullSchemaAndSpec */, true /* writeLongUTF */);
   }
 }

Reply via email to