This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4d88ecfe924 [FLINK-39084][core] variant accessor methods for handling
map or array types in converters
4d88ecfe924 is described below
commit 4d88ecfe924b3704c10d098c1ef45b5d55351e93
Author: Swapna Marru <[email protected]>
AuthorDate: Mon Feb 23 23:35:46 2026 -0800
[FLINK-39084][core] variant accessor methods for handling map or array
types in converters
---
.../apache/flink/types/variant/BinaryVariant.java | 23 ++++++++++++++++++++++
.../org/apache/flink/types/variant/Variant.java | 18 +++++++++++++++++
.../flink/types/variant/BinaryVariantTest.java | 5 +++++
3 files changed, 46 insertions(+)
diff --git
a/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
b/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
index 2e8f80f0c2c..ce5664d34e3 100644
--- a/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
+++ b/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
@@ -32,8 +32,10 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
+import java.util.List;
import java.util.Objects;
import static
org.apache.flink.types.variant.BinaryVariantUtil.BINARY_SEARCH_THRESHOLD;
@@ -242,11 +244,32 @@ public final class BinaryVariant implements Variant {
return getElementAtIndex(index);
}
+ @Override
+ public int getArraySize() throws VariantTypeException {
+ return handleArray(value, pos, (size, offsetSize, offsetStart,
dataStart) -> size);
+ }
+
@Override
public Variant getField(String fieldName) throws VariantTypeException {
return getFieldByKey(fieldName);
}
+ @Override
+ public List<String> getFieldNames() throws VariantTypeException {
+ return handleObject(
+ value,
+ pos,
+ (size, idSize, offsetSize, idStart, offsetStart, dataStart) ->
{
+ List<String> fieldNames = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ int id = readUnsigned(value, idStart + idSize * i,
idSize);
+ String fieldName = getMetadataKey(metadata, id);
+ fieldNames.add(fieldName);
+ }
+ return fieldNames;
+ });
+ }
+
@Override
public String toJson() {
StringBuilder sb = new StringBuilder();
diff --git
a/flink-core/src/main/java/org/apache/flink/types/variant/Variant.java
b/flink-core/src/main/java/org/apache/flink/types/variant/Variant.java
index 6d6753c0406..c0f15788f3f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/variant/Variant.java
+++ b/flink-core/src/main/java/org/apache/flink/types/variant/Variant.java
@@ -24,6 +24,7 @@ import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.util.List;
/** Variant represent a semi-structured data. */
@PublicEvolving
@@ -175,6 +176,14 @@ public interface Variant {
*/
Variant getElement(int index) throws VariantTypeException;
+ /**
+ * Get the size of an array variant.
+ *
+ * @return Number of elements if this variant is an array
+ * @throws VariantTypeException If this variant is not an array.
+ */
+ int getArraySize() throws VariantTypeException;
+
/**
* Access value of the specified field of an object variant. If there is
no field with the
* specified name, null is returned.
@@ -186,6 +195,15 @@ public interface Variant {
*/
Variant getField(String fieldName) throws VariantTypeException;
+ /**
+ * Get the field names of an object variant only at top level. Doesn't
include the nested
+ * fields.
+ *
+ * @return List of field names if this variant is an object
+ * @throws VariantTypeException If this variant is not an object.
+ */
+ List<String> getFieldNames() throws VariantTypeException;
+
/** Parses the variant to json. */
String toJson();
diff --git
a/flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java
b/flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java
index 9b52bb328c1..83896ec53e1 100644
---
a/flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java
+++
b/flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java
@@ -113,6 +113,7 @@ class BinaryVariantTest {
assertThat(variant.isPrimitive()).isFalse();
assertThat(variant.isObject()).isFalse();
assertThat(variant.getType()).isEqualTo(Variant.Type.ARRAY);
+ assertThat(variant.getArraySize()).isEqualTo(5);
assertThat(variant.getElement(-1)).isNull();
assertThat(variant.getElement(0).getInt()).isEqualTo(1);
@@ -144,12 +145,16 @@ class BinaryVariantTest {
assertThat(variant.isPrimitive()).isFalse();
assertThat(variant.isObject()).isTrue();
assertThat(variant.getType()).isEqualTo(Variant.Type.OBJECT);
+ assertThat(variant.getFieldNames()).containsExactlyInAnyOrder("list",
"object", "bb");
assertThat(variant.getField("list").isArray()).isTrue();
+ assertThat(variant.getField("list").getArraySize()).isEqualTo(2);
assertThat(variant.getField("list").getElement(0).getString()).isEqualTo("hello");
assertThat(variant.getField("list").getElement(1).getInt()).isEqualTo(1);
assertThat(variant.getField("object").isObject()).isTrue();
+ assertThat(variant.getField("object").getFieldNames())
+ .containsExactlyInAnyOrder("ss", "ff");
assertThat(variant.getField("object").getField("ss").getShort()).isEqualTo((short)
1);
assertThat(variant.getField("object").getField("ff").getFloat()).isEqualTo((10.0f));