emkornfield commented on code in PR #15283:
URL: https://github.com/apache/iceberg/pull/15283#discussion_r2789593303
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -464,6 +474,119 @@ protected Temporal convertTimestampValue(Object value,
TimestampType type) {
return convertLocalDateTime(value);
}
+ protected Variant convertVariantValue(Object value) {
Review Comment:
Is there a reason this is protected? If it is intended to be overridden it
would be good to document the contract that implementors must follow (and what
the default implementation does).
I guess other methods in this class don't do that, but seems like maybe we
should start?
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -464,6 +474,119 @@ protected Temporal convertTimestampValue(Object value,
TimestampType type) {
return convertLocalDateTime(value);
}
+ protected Variant convertVariantValue(Object value) {
+ if (value instanceof Variant) {
+ return (Variant) value;
+ } else if (value instanceof ByteBuffer) {
+ return Variant.from((ByteBuffer) value);
+ }
+
+ List<String> allFieldNames =
+
collectFieldNames(value).stream().distinct().sorted().collect(Collectors.toList());
+ VariantMetadata metadata =
+ allFieldNames.isEmpty() ? Variants.emptyMetadata() :
Variants.metadata(allFieldNames);
+ VariantValue variantValue = objectToVariantValue(value, metadata);
+ return Variant.of(metadata, variantValue);
+ }
+
+ /**
+ * Collects all field names (map keys) from the entire object tree. Used to
build a single
+ * VariantMetadata for the whole Variant (required for nested maps).
+ */
+ private static List<String> collectFieldNames(Object value) {
+ if (value == null) {
+ return Lists.newArrayList();
+ }
+ if (value instanceof Map) {
+ Map<?, ?> map = (Map<?, ?>) value;
+ List<String> names = Lists.newArrayList();
+ map.keySet().stream().map(Object::toString).forEach(names::add);
Review Comment:
Why not do the iteration once by visiting the entry set below?
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -464,6 +474,119 @@ protected Temporal convertTimestampValue(Object value,
TimestampType type) {
return convertLocalDateTime(value);
}
+ protected Variant convertVariantValue(Object value) {
Review Comment:
I'm not sure if it is generally done in this project but it might be good to
consider having protection against stack overflow on recursion
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -464,6 +474,119 @@ protected Temporal convertTimestampValue(Object value,
TimestampType type) {
return convertLocalDateTime(value);
}
+ protected Variant convertVariantValue(Object value) {
+ if (value instanceof Variant) {
+ return (Variant) value;
+ } else if (value instanceof ByteBuffer) {
+ return Variant.from((ByteBuffer) value);
+ }
+
+ List<String> allFieldNames =
+
collectFieldNames(value).stream().distinct().sorted().collect(Collectors.toList());
+ VariantMetadata metadata =
+ allFieldNames.isEmpty() ? Variants.emptyMetadata() :
Variants.metadata(allFieldNames);
+ VariantValue variantValue = objectToVariantValue(value, metadata);
+ return Variant.of(metadata, variantValue);
+ }
+
+ /**
+ * Collects all field names (map keys) from the entire object tree. Used to
build a single
+ * VariantMetadata for the whole Variant (required for nested maps).
+ */
+ private static List<String> collectFieldNames(Object value) {
+ if (value == null) {
+ return Lists.newArrayList();
Review Comment:
nit: use `List.of()`?
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -464,6 +474,119 @@ protected Temporal convertTimestampValue(Object value,
TimestampType type) {
return convertLocalDateTime(value);
}
+ protected Variant convertVariantValue(Object value) {
+ if (value instanceof Variant) {
+ return (Variant) value;
+ } else if (value instanceof ByteBuffer) {
+ return Variant.from((ByteBuffer) value);
+ }
+
+ List<String> allFieldNames =
+
collectFieldNames(value).stream().distinct().sorted().collect(Collectors.toList());
+ VariantMetadata metadata =
+ allFieldNames.isEmpty() ? Variants.emptyMetadata() :
Variants.metadata(allFieldNames);
+ VariantValue variantValue = objectToVariantValue(value, metadata);
+ return Variant.of(metadata, variantValue);
+ }
+
+ /**
+ * Collects all field names (map keys) from the entire object tree. Used to
build a single
+ * VariantMetadata for the whole Variant (required for nested maps).
+ */
+ private static List<String> collectFieldNames(Object value) {
Review Comment:
it seems like you can maybe avoid some object churn by passing through a Set
that gets elements added to as it goes down on recursion?
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -464,6 +474,119 @@ protected Temporal convertTimestampValue(Object value,
TimestampType type) {
return convertLocalDateTime(value);
}
+ protected Variant convertVariantValue(Object value) {
+ if (value instanceof Variant) {
+ return (Variant) value;
+ } else if (value instanceof ByteBuffer) {
+ return Variant.from((ByteBuffer) value);
+ }
+
+ List<String> allFieldNames =
+
collectFieldNames(value).stream().distinct().sorted().collect(Collectors.toList());
+ VariantMetadata metadata =
+ allFieldNames.isEmpty() ? Variants.emptyMetadata() :
Variants.metadata(allFieldNames);
+ VariantValue variantValue = objectToVariantValue(value, metadata);
+ return Variant.of(metadata, variantValue);
+ }
+
+ /**
+ * Collects all field names (map keys) from the entire object tree. Used to
build a single
+ * VariantMetadata for the whole Variant (required for nested maps).
+ */
+ private static List<String> collectFieldNames(Object value) {
+ if (value == null) {
+ return Lists.newArrayList();
+ }
+ if (value instanceof Map) {
+ Map<?, ?> map = (Map<?, ?>) value;
+ List<String> names = Lists.newArrayList();
+ map.keySet().stream().map(Object::toString).forEach(names::add);
+ for (Object v : map.values()) {
+ names.addAll(collectFieldNames(v));
+ }
+ return names;
+ }
+ if (value instanceof Collection) {
+ List<String> names = Lists.newArrayList();
+ for (Object element : (Collection<?>) value) {
+ names.addAll(collectFieldNames(element));
+ }
+ return names;
+ }
+ return Lists.newArrayList();
+ }
+
+ /**
+ * Recursively converts a Java object to a VariantValue using the given
shared metadata for all
+ * nested maps. Handles primitives, List (array), and Map (object); map keys
become field names.
+ */
+ private static VariantValue objectToVariantValue(Object value,
VariantMetadata metadata) {
+ if (value == null) {
+ return Variants.ofNull();
+ }
+ if (value instanceof Boolean) {
+ return Variants.of((Boolean) value);
+ }
+ if (value instanceof Number) {
+ return numberToVariantValue((Number) value);
+ }
+ if (value instanceof String) {
+ return Variants.of((String) value);
+ }
+ if (value instanceof ByteBuffer) {
+ return Variants.of((ByteBuffer) value);
+ }
+ if (value instanceof byte[]) {
+ return Variants.of(ByteBuffer.wrap((byte[]) value));
+ }
+ if (value instanceof BigDecimal) {
+ return Variants.of((BigDecimal) value);
+ }
+ if (value instanceof UUID) {
+ return Variants.ofUUID((UUID) value);
+ }
+ if (value instanceof Collection) {
+ ValueArray array = Variants.array();
+ for (Object element : (Collection<?>) value) {
+ array.add(objectToVariantValue(element, metadata));
+ }
+ return array;
+ }
+ if (value instanceof Map) {
+ Map<?, ?> map = (Map<?, ?>) value;
+ ShreddedObject object = Variants.object(metadata);
+ map.forEach((k, v) -> object.put(k.toString(), objectToVariantValue(v,
metadata)));
+ return object;
+ }
+ throw new IllegalArgumentException("Cannot convert to variant: " +
value.getClass().getName());
+ }
+
+ private static VariantValue numberToVariantValue(Number value) {
+ if (value instanceof Integer) {
+ return Variants.of((Integer) value);
+ }
+ if (value instanceof Long) {
+ return Variants.of((Long) value);
+ }
+ if (value instanceof Float) {
+ return Variants.of((Float) value);
+ }
+ if (value instanceof Double) {
+ return Variants.of((Double) value);
+ }
+ if (value instanceof Byte) {
+ return Variants.of((Byte) value);
+ }
+ if (value instanceof Short) {
+ return Variants.of((Short) value);
+ }
+ Number num = value;
+ if (num.doubleValue() == num.longValue()) {
+ return Variants.of(num.longValue());
+ }
+ return Variants.of(num.doubleValue());
Review Comment:
It seems safer to throw an exception here, otherwise we potentially have
data loss (or at least this should be optional)?
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java:
##########
@@ -881,6 +887,102 @@ public void testEvolveTypeDetectionStructNested() {
assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class);
}
+ @Test
Review Comment:
I think it might be more maintainable to write tests against
convertVariantValue instead of the static helpers.
Also it seems like there should be tests for convertVariantValue
specifically?
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -464,6 +474,119 @@ protected Temporal convertTimestampValue(Object value,
TimestampType type) {
return convertLocalDateTime(value);
}
+ protected Variant convertVariantValue(Object value) {
+ if (value instanceof Variant) {
+ return (Variant) value;
+ } else if (value instanceof ByteBuffer) {
+ return Variant.from((ByteBuffer) value);
+ }
+
+ List<String> allFieldNames =
+
collectFieldNames(value).stream().distinct().sorted().collect(Collectors.toList());
+ VariantMetadata metadata =
+ allFieldNames.isEmpty() ? Variants.emptyMetadata() :
Variants.metadata(allFieldNames);
+ VariantValue variantValue = objectToVariantValue(value, metadata);
+ return Variant.of(metadata, variantValue);
+ }
+
+ /**
+ * Collects all field names (map keys) from the entire object tree. Used to
build a single
+ * VariantMetadata for the whole Variant (required for nested maps).
+ */
+ private static List<String> collectFieldNames(Object value) {
+ if (value == null) {
+ return Lists.newArrayList();
+ }
+ if (value instanceof Map) {
+ Map<?, ?> map = (Map<?, ?>) value;
+ List<String> names = Lists.newArrayList();
+ map.keySet().stream().map(Object::toString).forEach(names::add);
+ for (Object v : map.values()) {
+ names.addAll(collectFieldNames(v));
+ }
+ return names;
+ }
+ if (value instanceof Collection) {
+ List<String> names = Lists.newArrayList();
+ for (Object element : (Collection<?>) value) {
+ names.addAll(collectFieldNames(element));
+ }
+ return names;
+ }
+ return Lists.newArrayList();
+ }
+
+ /**
+ * Recursively converts a Java object to a VariantValue using the given
shared metadata for all
+ * nested maps. Handles primitives, List (array), and Map (object); map keys
become field names.
+ */
+ private static VariantValue objectToVariantValue(Object value,
VariantMetadata metadata) {
+ if (value == null) {
+ return Variants.ofNull();
+ }
+ if (value instanceof Boolean) {
+ return Variants.of((Boolean) value);
+ }
+ if (value instanceof Number) {
+ return numberToVariantValue((Number) value);
+ }
+ if (value instanceof String) {
+ return Variants.of((String) value);
+ }
+ if (value instanceof ByteBuffer) {
+ return Variants.of((ByteBuffer) value);
+ }
+ if (value instanceof byte[]) {
+ return Variants.of(ByteBuffer.wrap((byte[]) value));
+ }
+ if (value instanceof BigDecimal) {
Review Comment:
I think this can be folded underneath Number handling?
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -464,6 +474,119 @@ protected Temporal convertTimestampValue(Object value,
TimestampType type) {
return convertLocalDateTime(value);
}
+ protected Variant convertVariantValue(Object value) {
+ if (value instanceof Variant) {
+ return (Variant) value;
+ } else if (value instanceof ByteBuffer) {
+ return Variant.from((ByteBuffer) value);
+ }
+
+ List<String> allFieldNames =
+
collectFieldNames(value).stream().distinct().sorted().collect(Collectors.toList());
+ VariantMetadata metadata =
+ allFieldNames.isEmpty() ? Variants.emptyMetadata() :
Variants.metadata(allFieldNames);
+ VariantValue variantValue = objectToVariantValue(value, metadata);
+ return Variant.of(metadata, variantValue);
+ }
+
+ /**
+ * Collects all field names (map keys) from the entire object tree. Used to
build a single
+ * VariantMetadata for the whole Variant (required for nested maps).
+ */
+ private static List<String> collectFieldNames(Object value) {
+ if (value == null) {
+ return Lists.newArrayList();
+ }
+ if (value instanceof Map) {
+ Map<?, ?> map = (Map<?, ?>) value;
+ List<String> names = Lists.newArrayList();
+ map.keySet().stream().map(Object::toString).forEach(names::add);
Review Comment:
This also seems more consistent with the loop below on line 509
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]