This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 3048d772ae Flink: Dynamic Sink: Fix serialization issues with schemas
larger than 2^16 bytes (#14880)
3048d772ae is described below
commit 3048d772aed6572bce28abbddae661d441058f74
Author: Maximilian Michels <[email protected]>
AuthorDate: Mon Jan 5 14:22:39 2026 +0100
Flink: Dynamic Sink: Fix serialization issues with schemas larger than 2^16
bytes (#14880)
---
.../dynamic/DynamicRecordInternalSerializer.java | 103 +++++++++++++++---
.../flink/source/split/IcebergSourceSplit.java | 1 +
.../{source/split => util}/SerializerHelper.java | 15 +--
.../DynamicRecordInternalSerializerTestBase.java | 8 +-
.../TestDynamicRecordInternalSerializer.java | 119 +++++++++++++++++++++
...DynamicRecordInternalSerializerWriteSchema.java | 4 +-
...namicRecordInternalSerializerWriteSchemaId.java | 4 +-
...ordInternalSerializerWriteSchemaIdLongUTF.java} | 8 +-
...ecordInternalSerializerWriteSchemaLongUTF.java} | 8 +-
9 files changed, 236 insertions(+), 34 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
index 22b7742913..0d758ace1b 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -34,6 +35,9 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.flink.util.SerializerHelper;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@Internal
@@ -43,18 +47,27 @@ class DynamicRecordInternalSerializer extends
TypeSerializer<DynamicRecordIntern
private final TableSerializerCache serializerCache;
private final boolean writeSchemaAndSpec;
+ private final boolean writeLongUTF;
DynamicRecordInternalSerializer(
TableSerializerCache serializerCache, boolean writeSchemaAndSpec) {
+ this(serializerCache, writeSchemaAndSpec, true);
+ }
+
+ @VisibleForTesting
+ DynamicRecordInternalSerializer(
+ TableSerializerCache serializerCache, boolean writeSchemaAndSpec,
boolean writeLongUTF) {
this.serializerCache = serializerCache;
this.writeSchemaAndSpec = writeSchemaAndSpec;
+ this.writeLongUTF = writeLongUTF;
}
@Override
public TypeSerializer<DynamicRecordInternal> duplicate() {
return new DynamicRecordInternalSerializer(
new TableSerializerCache(serializerCache.catalogLoader(),
serializerCache.maximumSize()),
- writeSchemaAndSpec);
+ writeSchemaAndSpec,
+ writeLongUTF);
}
@Override
@@ -68,7 +81,12 @@ class DynamicRecordInternalSerializer extends
TypeSerializer<DynamicRecordIntern
dataOutputView.writeUTF(toSerialize.tableName());
dataOutputView.writeUTF(toSerialize.branch());
if (writeSchemaAndSpec) {
- dataOutputView.writeUTF(SchemaParser.toJson(toSerialize.schema()));
+ if (writeLongUTF) {
+ SerializerHelper.writeLongUTF(dataOutputView,
SchemaParser.toJson(toSerialize.schema()));
+ } else {
+ dataOutputView.writeUTF(SchemaParser.toJson(toSerialize.schema()));
+ }
+
dataOutputView.writeUTF(PartitionSpecParser.toJson(toSerialize.spec()));
} else {
dataOutputView.writeInt(toSerialize.schema().schemaId());
@@ -108,7 +126,12 @@ class DynamicRecordInternalSerializer extends
TypeSerializer<DynamicRecordIntern
final PartitionSpec spec;
final RowDataSerializer rowDataSerializer;
if (writeSchemaAndSpec) {
- schema = SchemaParser.fromJson(dataInputView.readUTF());
+ if (writeLongUTF) {
+ schema =
SchemaParser.fromJson(SerializerHelper.readLongUTF(dataInputView));
+ } else {
+ schema = SchemaParser.fromJson(dataInputView.readUTF());
+ }
+
spec = PartitionSpecParser.fromJson(schema, dataInputView.readUTF());
rowDataSerializer = serializerCache.serializer(tableName, schema, spec);
} else {
@@ -152,7 +175,12 @@ class DynamicRecordInternalSerializer extends
TypeSerializer<DynamicRecordIntern
final PartitionSpec spec;
final RowDataSerializer rowDataSerializer;
if (writeSchemaAndSpec) {
- schema = SchemaParser.fromJson(dataInputView.readUTF());
+ if (writeLongUTF) {
+ schema =
SchemaParser.fromJson(SerializerHelper.readLongUTF(dataInputView));
+ } else {
+ schema = SchemaParser.fromJson(dataInputView.readUTF());
+ }
+
spec = PartitionSpecParser.fromJson(schema, dataInputView.readUTF());
reuse.setSchema(schema);
reuse.setSpec(spec);
@@ -245,25 +273,32 @@ class DynamicRecordInternalSerializer extends
TypeSerializer<DynamicRecordIntern
@Override
public TypeSerializerSnapshot<DynamicRecordInternal> snapshotConfiguration()
{
- return new DynamicRecordInternalTypeSerializerSnapshot(writeSchemaAndSpec);
+ return new DynamicRecordInternalTypeSerializerSnapshot(writeSchemaAndSpec,
serializerCache);
}
public static class DynamicRecordInternalTypeSerializerSnapshot
implements TypeSerializerSnapshot<DynamicRecordInternal> {
+ private static final int MOST_RECENT_VERSION = 1;
+
private boolean writeSchemaAndSpec;
+ private int version;
+ private TableSerializerCache serializerCache;
- // Zero args constructor is required to instantiate this class on restore
+ // Zero args constructor is required to instantiate this class on restore
via readSnapshot(..)
@SuppressWarnings({"unused", "checkstyle:RedundantModifier"})
public DynamicRecordInternalTypeSerializerSnapshot() {}
- DynamicRecordInternalTypeSerializerSnapshot(boolean writeSchemaAndSpec) {
+ DynamicRecordInternalTypeSerializerSnapshot(
+ boolean writeSchemaAndSpec, TableSerializerCache serializerCache) {
this.writeSchemaAndSpec = writeSchemaAndSpec;
+ this.serializerCache = serializerCache;
+ this.version = MOST_RECENT_VERSION;
}
@Override
public int getCurrentVersion() {
- return 0;
+ return version;
}
@Override
@@ -274,22 +309,62 @@ class DynamicRecordInternalSerializer extends
TypeSerializer<DynamicRecordIntern
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader
userCodeClassLoader)
throws IOException {
+ this.version = readVersion;
this.writeSchemaAndSpec = in.readBoolean();
}
@Override
public TypeSerializerSchemaCompatibility<DynamicRecordInternal>
resolveSchemaCompatibility(
TypeSerializerSnapshot<DynamicRecordInternal> oldSerializerSnapshot) {
- return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ if (oldSerializerSnapshot.getCurrentVersion() == getCurrentVersion()) {
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
+
+ // Old TypeSerializerSnapshots do not contain the serializer cache, but
the newest one does.
+ // This will also ensure that we always use the up-to-date cache
alongside with its catalog
+ // configuration.
+ Preconditions.checkNotNull(serializerCache, "serializerCache should not
be null");
+ try {
+ DynMethods.builder("initializeSerializerCache")
+ .hiddenImpl(
+ DynamicRecordInternalTypeSerializerSnapshot.class,
TableSerializerCache.class)
+ .build()
+ .invoke(oldSerializerSnapshot, serializerCache);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to initialize serializerCache for reading data with old
serializer", e);
+ }
+
+ // This will first read data with the old serializer, then switch to the
most recent one.
+ return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
}
@Override
public TypeSerializer<DynamicRecordInternal> restoreSerializer() {
- // Note: We pass in a null serializer cache which would create issues if
we tried to use this
- // restored serializer, but since we are using {@code
- // TypeSerializerSchemaCompatibility.compatibleAsIs()} above, this
serializer will never be
- // used. A new one will be created via {@code DynamicRecordInternalType}.
- return new DynamicRecordInternalSerializer(null, writeSchemaAndSpec);
+ if (getCurrentVersion() < MOST_RECENT_VERSION) {
+ // If this serializer is not the most recent one, we need to read old
data with the correct
+ // parameters.
+ return new DynamicRecordInternalSerializer(serializerCache,
writeSchemaAndSpec, false);
+ }
+
+ // In all other cases, we just use the newest serializer.
+ return new DynamicRecordInternalSerializer(serializerCache,
writeSchemaAndSpec, true);
+ }
+
+ /**
+ * We need to lazily initialize the cache from the up-to-date serializer
which has the current
+ * CatalogLoader available.
+ *
+ * <p>This method must not be removed!
+ */
+ @SuppressWarnings("unused")
+ private void initializeSerializerCache(TableSerializerCache cache) {
+ this.serializerCache = cache;
}
}
+
+ @VisibleForTesting
+ TableSerializerCache getSerializerCache() {
+ return serializerCache;
+ }
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index b6d6f60ef6..0cc1c633f9 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ScanTaskParser;
+import org.apache.iceberg.flink.util.SerializerHelper;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/SerializerHelper.java
similarity index 92%
rename from
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
rename to
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/SerializerHelper.java
index 841969666e..3a161ea263 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/SerializerHelper.java
@@ -16,20 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.flink.source.split;
+package org.apache.iceberg.flink.util;
import java.io.IOException;
import java.io.Serializable;
import java.io.UTFDataFormatException;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
/**
* Helper class to serialize and deserialize strings longer than 65K. The
inspiration is mostly
* taken from the class
org.apache.flink.core.memory.DataInputSerializer.readUTF and
* org.apache.flink.core.memory.DataOutputSerializer.writeUTF.
*/
-class SerializerHelper implements Serializable {
+@Internal
+public class SerializerHelper implements Serializable {
private SerializerHelper() {}
@@ -47,7 +51,7 @@ class SerializerHelper implements Serializable {
* @param out the output stream to write the string to.
* @param str the string value to be written.
*/
- public static void writeLongUTF(DataOutputSerializer out, String str) throws
IOException {
+ public static void writeLongUTF(DataOutputView out, String str) throws
IOException {
int strlen = str.length();
long utflen = 0;
int ch;
@@ -85,7 +89,7 @@ class SerializerHelper implements Serializable {
* @return the string value read from the input stream.
* @throws IOException if an I/O error occurs when reading from the input
stream.
*/
- public static String readLongUTF(DataInputDeserializer in) throws
IOException {
+ public static String readLongUTF(DataInputView in) throws IOException {
int utflen = in.readInt();
byte[] bytearr = new byte[utflen];
char[] chararr = new char[utflen];
@@ -168,8 +172,7 @@ class SerializerHelper implements Serializable {
}
}
- private static void writeUTFBytes(DataOutputSerializer out, String str, int
utflen)
- throws IOException {
+ private static void writeUTFBytes(DataOutputView out, String str, int
utflen) throws IOException {
int strlen = str.length();
int ch;
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
index 30782e8d41..07096b891a 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java
@@ -55,15 +55,19 @@ abstract class DynamicRecordInternalSerializerTestBase
static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).bucket("id", 10).build();
private boolean writeFullSchemaAndSpec;
+ private final boolean writeLongUTF;
- DynamicRecordInternalSerializerTestBase(boolean writeFullSchemaAndSpec) {
+ DynamicRecordInternalSerializerTestBase(boolean writeFullSchemaAndSpec,
boolean writeLongUTF) {
this.writeFullSchemaAndSpec = writeFullSchemaAndSpec;
+ this.writeLongUTF = writeLongUTF;
}
@Override
protected TypeSerializer<DynamicRecordInternal> createSerializer() {
return new DynamicRecordInternalSerializer(
- new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1),
writeFullSchemaAndSpec);
+ new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1),
+ writeFullSchemaAndSpec,
+ writeLongUTF);
}
@BeforeEach
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializer.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializer.java
new file mode 100644
index 0000000000..388ff30333
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class TestDynamicRecordInternalSerializer {
+
+ @RegisterExtension
+ static final HadoopCatalogExtension CATALOG_EXTENSION = new
HadoopCatalogExtension("db", "table");
+
+ @Test
+ void testCurrentTypeSerializerSnapshotVersion() {
+ TypeSerializer<DynamicRecordInternal> serializer = createSerializer();
+
assertThat(serializer).isNotNull().isInstanceOf(DynamicRecordInternalSerializer.class);
+ TypeSerializerSnapshot<DynamicRecordInternal> snapshot =
serializer.snapshotConfiguration();
+ assertThat(snapshot.getCurrentVersion()).isEqualTo(1);
+ }
+
+ @Test
+ void testCurrentTypeSerializerSnapshotCompatibility() {
+ TypeSerializer<DynamicRecordInternal> serializer = createSerializer();
+
assertThat(serializer).isNotNull().isInstanceOf(DynamicRecordInternalSerializer.class);
+ TypeSerializerSnapshot<DynamicRecordInternal> snapshot =
serializer.snapshotConfiguration();
+ assertThat(
+ snapshot
+ .resolveSchemaCompatibility(serializer.snapshotConfiguration())
+ .isCompatibleAsIs())
+ .isTrue();
+ }
+
+ @Test
+ void testRestoreFromOldVersion() throws IOException {
+ // Create a serialized snapshot of the TypeSerializer
+ final int oldVersion = 0;
+ OldTypeSerializerSnapshot oldTypeSerializerSnapshot = new
OldTypeSerializerSnapshot(oldVersion);
+
assertThat(oldTypeSerializerSnapshot.getCurrentVersion()).isEqualTo(oldVersion);
+ DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
+ oldTypeSerializerSnapshot.writeSnapshot(dataOutputSerializer);
+
+ // Load the serialized state
+
DynamicRecordInternalSerializer.DynamicRecordInternalTypeSerializerSnapshot
restoreSnapshot =
+
(DynamicRecordInternalSerializer.DynamicRecordInternalTypeSerializerSnapshot)
+ createSerializer().snapshotConfiguration();
+ restoreSnapshot.readSnapshot(
+ oldVersion,
+ new DataInputDeserializer(dataOutputSerializer.getSharedBuffer()),
+ getClass().getClassLoader());
+ // Check that it matches the original one
+ assertThat(restoreSnapshot.getCurrentVersion()).isEqualTo(oldVersion);
+ assertThat(
+ restoreSnapshot
+ .resolveSchemaCompatibility(oldTypeSerializerSnapshot)
+ .isCompatibleAsIs())
+ .isTrue();
+ TypeSerializer<DynamicRecordInternal> restoreSerializer =
restoreSnapshot.restoreSerializer();
+
assertThat(restoreSerializer).isInstanceOf(DynamicRecordInternalSerializer.class);
+ assertThat(((DynamicRecordInternalSerializer)
restoreSerializer).getSerializerCache())
+ .isNotNull();
+
+ // Compare against the latest version of a snapshot
+ TypeSerializerSnapshot<DynamicRecordInternal> latestVersion =
+ createSerializer().snapshotConfiguration();
+ assertThat(latestVersion.getCurrentVersion()).isEqualTo(1);
+ assertThat(
+ latestVersion
+ .resolveSchemaCompatibility(oldTypeSerializerSnapshot)
+ .isCompatibleAfterMigration())
+ .isTrue();
+ assertThat(
+
latestVersion.resolveSchemaCompatibility(restoreSnapshot).isCompatibleAfterMigration())
+ .isTrue();
+ }
+
+ private DynamicRecordInternalSerializer createSerializer() {
+ return new DynamicRecordInternalSerializer(
+ new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1), true);
+ }
+
+ private static class OldTypeSerializerSnapshot
+ extends
DynamicRecordInternalSerializer.DynamicRecordInternalTypeSerializerSnapshot {
+
+ private final int version;
+
+ OldTypeSerializerSnapshot(int version) {
+ this.version = version;
+ }
+
+ @Override
+ public int getCurrentVersion() {
+ return version;
+ }
+ }
+}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
index ab8ce98c35..e7aa2d24d9 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
@@ -18,11 +18,11 @@
*/
package org.apache.iceberg.flink.sink.dynamic;
-/** Test writing DynamicRecord with the full schema */
+/** Test writing DynamicRecord with the full schema and standard UTF encoding
*/
class TestDynamicRecordInternalSerializerWriteSchema
extends DynamicRecordInternalSerializerTestBase {
TestDynamicRecordInternalSerializerWriteSchema() {
- super(true);
+ super(true /* writeFullSchemaAndSpec */, false /* writeLongUTF */);
}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java
index 1d88905462..bff0fd5c6a 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java
@@ -18,11 +18,11 @@
*/
package org.apache.iceberg.flink.sink.dynamic;
-/** Test writing DynamicRecord with only the schema id. */
+/** Test writing DynamicRecord with only the schema id and standard UTF
encoding */
class TestDynamicRecordInternalSerializerWriteSchemaId
extends DynamicRecordInternalSerializerTestBase {
TestDynamicRecordInternalSerializerWriteSchemaId() {
- super(false);
+ super(false /* writeFullSchemaAndSpec */, false /* writeLongUTF */);
}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF.java
similarity index 76%
copy from
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
copy to
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF.java
index ab8ce98c35..7a1ae3df38 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF.java
@@ -18,11 +18,11 @@
*/
package org.apache.iceberg.flink.sink.dynamic;
-/** Test writing DynamicRecord with the full schema */
-class TestDynamicRecordInternalSerializerWriteSchema
+/** Test writing DynamicRecord with only the schema id and long UTF encoding */
+class TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF
extends DynamicRecordInternalSerializerTestBase {
- TestDynamicRecordInternalSerializerWriteSchema() {
- super(true);
+ TestDynamicRecordInternalSerializerWriteSchemaIdLongUTF() {
+ super(false /* writeFullSchemaAndSpec */, true /* writeLongUTF */);
}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaLongUTF.java
similarity index 77%
copy from
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
copy to
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaLongUTF.java
index ab8ce98c35..faff8921db 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaLongUTF.java
@@ -18,11 +18,11 @@
*/
package org.apache.iceberg.flink.sink.dynamic;
-/** Test writing DynamicRecord with the full schema */
-class TestDynamicRecordInternalSerializerWriteSchema
+/** Test writing DynamicRecord with the full schema and long UTF encoding */
+class TestDynamicRecordInternalSerializerWriteSchemaLongUTF
extends DynamicRecordInternalSerializerTestBase {
- TestDynamicRecordInternalSerializerWriteSchema() {
- super(true);
+ TestDynamicRecordInternalSerializerWriteSchemaLongUTF() {
+ super(true /* writeFullSchemaAndSpec */, true /* writeLongUTF */);
}
}