michaelkoepf commented on code in PR #1404: URL: https://github.com/apache/fluss/pull/1404#discussion_r2231345919
########## fluss-client/src/main/java/com/alibaba/fluss/client/utils/ConverterUtils.java: ########## @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.utils; + +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DataTypeRoot; +import com.alibaba.fluss.types.DecimalType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.TimestampType; +import com.alibaba.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Helper class for converting Java POJOs to Fluss's {@link InternalRow} format and vice versa. Review Comment: I think the precondition introduced here ("Java POJO") is not restrictive enough, because a [POJO](https://www.baeldung.com/java-pojo-class#what-is-a-pojo) may not necessarily have a default constructor, but the code does assume one. Maybe it is better to use the more standardized concept of a [Java Bean](https://www.baeldung.com/java-pojo-class#what-is-a-javabean.)? While this comes with boilerplate and means that the fields will be `private` and users have to access them via getters, Java users won't bother too much because Java beans are everywhere, so they are used to it. An additional benefit is that by saying we expect a Java bean, there _must be_ a public constructor and public setters, so we do not need to tinker around with the `setAccessible` reflection calls. ########## fluss-client/src/main/java/com/alibaba/fluss/client/utils/ConverterUtils.java: ########## @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.utils; + +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DataTypeRoot; +import com.alibaba.fluss.types.DecimalType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.TimestampType; +import com.alibaba.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Helper class for converting Java POJOs to Fluss's {@link InternalRow} format and vice versa. + * + * <p>This utility uses reflection to map fields from POJOs to InternalRow and back based on a given + * schema. It includes caching mechanisms to avoid repeated reflection operations for the same POJO + * types. + * + * <p>Example usage: + * + * <pre>{@code + * // Create a converter + * ConverterUtils<Order> converter = new ConverterUtils<>(Order.class, rowType); + * + * // Convert a POJO to GenericRow + * Order order = new Order(1001L, 5001L, 10, "123 Athens"); + * GenericRow row = converter.toRow(order); + * + * // Convert a GenericRow back to POJO + * Order convertedOrder = converter.fromRow(row); + * }</pre> + * + * <p>Note: Nested POJO fields are not supported in the current implementation. + * + * @param <T> The POJO type to convert + */ +public class ConverterUtils<T> { + private static final Logger LOG = LoggerFactory.getLogger(ConverterUtils.class); + + /** Cache for converters to avoid repeated reflection operations. */ + private static final ConcurrentHashMap<CacheKey, ConverterUtils<?>> CONVERTER_CACHE = + MapUtils.newConcurrentHashMap(); + + /** Map of supported Java types for each DataTypeRoot. */ + private static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES = new HashMap<>(); + + static { + SUPPORTED_TYPES.put(DataTypeRoot.BOOLEAN, orderedSet(Boolean.class, boolean.class)); + SUPPORTED_TYPES.put(DataTypeRoot.TINYINT, orderedSet(Byte.class, byte.class)); + SUPPORTED_TYPES.put(DataTypeRoot.SMALLINT, orderedSet(Short.class, short.class)); + SUPPORTED_TYPES.put(DataTypeRoot.INTEGER, orderedSet(Integer.class, int.class)); + SUPPORTED_TYPES.put(DataTypeRoot.BIGINT, orderedSet(Long.class, long.class)); + SUPPORTED_TYPES.put(DataTypeRoot.FLOAT, orderedSet(Float.class, float.class)); + SUPPORTED_TYPES.put(DataTypeRoot.DOUBLE, orderedSet(Double.class, double.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.CHAR, orderedSet(String.class, Character.class, char.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.STRING, orderedSet(String.class, Character.class, char.class)); + SUPPORTED_TYPES.put(DataTypeRoot.BINARY, orderedSet(byte[].class)); + SUPPORTED_TYPES.put(DataTypeRoot.BYTES, orderedSet(byte[].class)); + SUPPORTED_TYPES.put(DataTypeRoot.DECIMAL, orderedSet(BigDecimal.class)); + SUPPORTED_TYPES.put(DataTypeRoot.DATE, orderedSet(LocalDate.class)); + SUPPORTED_TYPES.put(DataTypeRoot.TIME_WITHOUT_TIME_ZONE, orderedSet(LocalTime.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, orderedSet(LocalDateTime.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + orderedSet(Instant.class, OffsetDateTime.class)); + + // TODO: Add more types when https://github.com/apache/fluss/issues/816 is merged + } + + /** Interface for field conversion from POJO field to Fluss InternalRow field. */ + private interface FieldToRowConverter { + Object convert(Object obj) throws IllegalAccessException; + } + + /** Interface for field conversion from Fluss InternalRow field to POJO field. */ + private interface RowToFieldConverter { + Object convert(InternalRow row, int pos) throws IllegalAccessException; + } + + private final Class<T> pojoClass; + private final RowType rowType; + private final FieldToRowConverter[] fieldToRowConverters; + private final RowToFieldConverter[] rowToFieldConverters; + private final Field[] pojoFields; + private final Constructor<T> defaultConstructor; + + /** + * Creates a new converter for the specified POJO class and row type. + * + * @param pojoClass The class of POJOs to convert + * @param rowType The row schema to use for conversion + */ + @SuppressWarnings("unchecked") + public ConverterUtils(Class<T> pojoClass, RowType rowType) { + this.pojoClass = pojoClass; + this.rowType = rowType; + + // Create converters for each field + this.pojoFields = new Field[rowType.getFieldCount()]; + this.fieldToRowConverters = createFieldToRowConverters(); + this.rowToFieldConverters = createRowToFieldConverters(); + + // Get the default constructor for creating new instances + try { + this.defaultConstructor = pojoClass.getDeclaredConstructor(); + this.defaultConstructor.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + "POJO class " + pojoClass.getName() + " must have a default constructor", e); + } + } + + /** + * Gets a cached converter for the specified POJO class and row type, or creates a new one if + * not found in the cache. + * + * @param pojoClass The class of POJOs to convert + * @param rowType The row schema to use for conversion + * @param <T> The POJO type + * @return A converter for the specified POJO class and row type + */ + @SuppressWarnings("unchecked") + public static <T> ConverterUtils<T> getConverter(Class<T> pojoClass, RowType rowType) { + CacheKey key = new CacheKey(pojoClass, rowType); + return (ConverterUtils<T>) + CONVERTER_CACHE.computeIfAbsent(key, k -> new ConverterUtils<>(pojoClass, rowType)); + } + + /** Creates field converters for converting from POJO to Row for each field in the schema. */ + private FieldToRowConverter[] createFieldToRowConverters() { + FieldToRowConverter[] converters = new FieldToRowConverter[rowType.getFieldCount()]; + + for (int i = 0; i < rowType.getFieldCount(); i++) { + String fieldName = rowType.getFieldNames().get(i); + DataType fieldType = rowType.getTypeAt(i); + + // Find field in POJO class + Field field = findField(pojoClass, fieldName); + if (field != null) { + pojoFields[i] = field; + + // Check if the field type is supported + if (!SUPPORTED_TYPES.containsKey(fieldType.getTypeRoot())) { + throw new UnsupportedOperationException( + "Unsupported field type " + + fieldType.getTypeRoot() + + " for field " + + field.getName()); + } + + // Create the appropriate converter for this field + converters[i] = createFieldToRowConverter(fieldType, field); + } else { + LOG.warn( + "Field '{}' not found in POJO class {}. Will return null for this field.", + fieldName, + pojoClass.getName()); + converters[i] = obj -> null; Review Comment: Are there any valid cases where the field cannot be found in the POJO class? If not, maybe it is better to throw an exception because this would, imo, indicate a schema mismatch (i.e., wrong user input). ########## fluss-client/src/main/java/com/alibaba/fluss/client/utils/ConverterUtils.java: ########## @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.utils; + +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DataTypeRoot; +import com.alibaba.fluss.types.DecimalType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.TimestampType; +import com.alibaba.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Helper class for converting Java POJOs to Fluss's {@link InternalRow} format and vice versa. + * + * <p>This utility uses reflection to map fields from POJOs to InternalRow and back based on a given + * schema. It includes caching mechanisms to avoid repeated reflection operations for the same POJO + * types. + * + * <p>Example usage: + * + * <pre>{@code + * // Create a converter + * ConverterUtils<Order> converter = new ConverterUtils<>(Order.class, rowType); + * + * // Convert a POJO to GenericRow + * Order order = new Order(1001L, 5001L, 10, "123 Athens"); + * GenericRow row = converter.toRow(order); + * + * // Convert a GenericRow back to POJO + * Order convertedOrder = converter.fromRow(row); + * }</pre> + * + * <p>Note: Nested POJO fields are not supported in the current implementation. + * + * @param <T> The POJO type to convert + */ +public class ConverterUtils<T> { + private static final Logger LOG = LoggerFactory.getLogger(ConverterUtils.class); + + /** Cache for converters to avoid repeated reflection operations. */ + private static final ConcurrentHashMap<CacheKey, ConverterUtils<?>> CONVERTER_CACHE = + MapUtils.newConcurrentHashMap(); + + /** Map of supported Java types for each DataTypeRoot. */ + private static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES = new HashMap<>(); + + static { + SUPPORTED_TYPES.put(DataTypeRoot.BOOLEAN, orderedSet(Boolean.class, boolean.class)); + SUPPORTED_TYPES.put(DataTypeRoot.TINYINT, orderedSet(Byte.class, byte.class)); + SUPPORTED_TYPES.put(DataTypeRoot.SMALLINT, orderedSet(Short.class, short.class)); + SUPPORTED_TYPES.put(DataTypeRoot.INTEGER, orderedSet(Integer.class, int.class)); + SUPPORTED_TYPES.put(DataTypeRoot.BIGINT, orderedSet(Long.class, long.class)); + SUPPORTED_TYPES.put(DataTypeRoot.FLOAT, orderedSet(Float.class, float.class)); + SUPPORTED_TYPES.put(DataTypeRoot.DOUBLE, orderedSet(Double.class, double.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.CHAR, orderedSet(String.class, Character.class, char.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.STRING, orderedSet(String.class, Character.class, char.class)); + SUPPORTED_TYPES.put(DataTypeRoot.BINARY, orderedSet(byte[].class)); + SUPPORTED_TYPES.put(DataTypeRoot.BYTES, orderedSet(byte[].class)); + SUPPORTED_TYPES.put(DataTypeRoot.DECIMAL, orderedSet(BigDecimal.class)); + SUPPORTED_TYPES.put(DataTypeRoot.DATE, orderedSet(LocalDate.class)); + SUPPORTED_TYPES.put(DataTypeRoot.TIME_WITHOUT_TIME_ZONE, orderedSet(LocalTime.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, orderedSet(LocalDateTime.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + orderedSet(Instant.class, OffsetDateTime.class)); + + // TODO: Add more types when https://github.com/apache/fluss/issues/816 is merged + } + + /** Interface for field conversion from POJO field to Fluss InternalRow field. */ + private interface FieldToRowConverter { + Object convert(Object obj) throws IllegalAccessException; + } + + /** Interface for field conversion from Fluss InternalRow field to POJO field. */ + private interface RowToFieldConverter { + Object convert(InternalRow row, int pos) throws IllegalAccessException; + } + + private final Class<T> pojoClass; + private final RowType rowType; + private final FieldToRowConverter[] fieldToRowConverters; + private final RowToFieldConverter[] rowToFieldConverters; + private final Field[] pojoFields; + private final Constructor<T> defaultConstructor; + + /** + * Creates a new converter for the specified POJO class and row type. + * + * @param pojoClass The class of POJOs to convert + * @param rowType The row schema to use for conversion + */ + @SuppressWarnings("unchecked") + public ConverterUtils(Class<T> pojoClass, RowType rowType) { Review Comment: Is there any reason for users to _not_ use the caching mechanism? If there is no reason to disable caching, we could make the constructor private, so users only have one way to obtain an instance of this class via the `getConverter` method? ########## fluss-client/src/main/java/com/alibaba/fluss/client/utils/ConverterUtils.java: ########## @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.utils; + +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DataTypeRoot; +import com.alibaba.fluss.types.DecimalType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.TimestampType; +import com.alibaba.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Helper class for converting Java POJOs to Fluss's {@link InternalRow} format and vice versa. + * + * <p>This utility uses reflection to map fields from POJOs to InternalRow and back based on a given + * schema. It includes caching mechanisms to avoid repeated reflection operations for the same POJO + * types. + * + * <p>Example usage: + * + * <pre>{@code + * // Create a converter + * ConverterUtils<Order> converter = new ConverterUtils<>(Order.class, rowType); + * + * // Convert a POJO to GenericRow + * Order order = new Order(1001L, 5001L, 10, "123 Athens"); + * GenericRow row = converter.toRow(order); + * + * // Convert a GenericRow back to POJO + * Order convertedOrder = converter.fromRow(row); + * }</pre> + * + * <p>Note: Nested POJO fields are not supported in the current implementation. + * + * @param <T> The POJO type to convert + */ +public class ConverterUtils<T> { + private static final Logger LOG = LoggerFactory.getLogger(ConverterUtils.class); + + /** Cache for converters to avoid repeated reflection operations. */ + private static final ConcurrentHashMap<CacheKey, ConverterUtils<?>> CONVERTER_CACHE = + MapUtils.newConcurrentHashMap(); + + /** Map of supported Java types for each DataTypeRoot. */ + private static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES = new HashMap<>(); + + static { + SUPPORTED_TYPES.put(DataTypeRoot.BOOLEAN, orderedSet(Boolean.class, boolean.class)); + SUPPORTED_TYPES.put(DataTypeRoot.TINYINT, orderedSet(Byte.class, byte.class)); + SUPPORTED_TYPES.put(DataTypeRoot.SMALLINT, orderedSet(Short.class, short.class)); + SUPPORTED_TYPES.put(DataTypeRoot.INTEGER, orderedSet(Integer.class, int.class)); + SUPPORTED_TYPES.put(DataTypeRoot.BIGINT, orderedSet(Long.class, long.class)); + SUPPORTED_TYPES.put(DataTypeRoot.FLOAT, orderedSet(Float.class, float.class)); + SUPPORTED_TYPES.put(DataTypeRoot.DOUBLE, orderedSet(Double.class, double.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.CHAR, orderedSet(String.class, Character.class, char.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.STRING, orderedSet(String.class, Character.class, char.class)); + SUPPORTED_TYPES.put(DataTypeRoot.BINARY, orderedSet(byte[].class)); + SUPPORTED_TYPES.put(DataTypeRoot.BYTES, orderedSet(byte[].class)); + SUPPORTED_TYPES.put(DataTypeRoot.DECIMAL, orderedSet(BigDecimal.class)); + SUPPORTED_TYPES.put(DataTypeRoot.DATE, orderedSet(LocalDate.class)); + SUPPORTED_TYPES.put(DataTypeRoot.TIME_WITHOUT_TIME_ZONE, orderedSet(LocalTime.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, orderedSet(LocalDateTime.class)); + SUPPORTED_TYPES.put( + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + orderedSet(Instant.class, OffsetDateTime.class)); + + // TODO: Add more types when https://github.com/apache/fluss/issues/816 is merged + } + + /** Interface for field conversion from POJO field to Fluss InternalRow field. */ + private interface FieldToRowConverter { + Object convert(Object obj) throws IllegalAccessException; + } + + /** Interface for field conversion from Fluss InternalRow field to POJO field. */ + private interface RowToFieldConverter { + Object convert(InternalRow row, int pos) throws IllegalAccessException; + } + + private final Class<T> pojoClass; + private final RowType rowType; + private final FieldToRowConverter[] fieldToRowConverters; + private final RowToFieldConverter[] rowToFieldConverters; + private final Field[] pojoFields; + private final Constructor<T> defaultConstructor; + + /** + * Creates a new converter for the specified POJO class and row type. + * + * @param pojoClass The class of POJOs to convert + * @param rowType The row schema to use for conversion + */ + @SuppressWarnings("unchecked") + public ConverterUtils(Class<T> pojoClass, RowType rowType) { + this.pojoClass = pojoClass; + this.rowType = rowType; + + // Create converters for each field + this.pojoFields = new Field[rowType.getFieldCount()]; + this.fieldToRowConverters = createFieldToRowConverters(); + this.rowToFieldConverters = createRowToFieldConverters(); + + // Get the default constructor for creating new instances + try { + this.defaultConstructor = pojoClass.getDeclaredConstructor(); + this.defaultConstructor.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + "POJO class " + pojoClass.getName() + " must have a default constructor", e); + } + } + + /** + * Gets a cached converter for the specified POJO class and row type, or creates a new one if + * not found in the cache. + * + * @param pojoClass The class of POJOs to convert + * @param rowType The row schema to use for conversion + * @param <T> The POJO type + * @return A converter for the specified POJO class and row type + */ + @SuppressWarnings("unchecked") + public static <T> ConverterUtils<T> getConverter(Class<T> pojoClass, RowType rowType) { + CacheKey key = new CacheKey(pojoClass, rowType); + return (ConverterUtils<T>) + CONVERTER_CACHE.computeIfAbsent(key, k -> new ConverterUtils<>(pojoClass, rowType)); + } + + /** Creates field converters for converting from POJO to Row for each field in the schema. */ + private FieldToRowConverter[] createFieldToRowConverters() { + FieldToRowConverter[] converters = new FieldToRowConverter[rowType.getFieldCount()]; + + for (int i = 0; i < rowType.getFieldCount(); i++) { + String fieldName = rowType.getFieldNames().get(i); + DataType fieldType = rowType.getTypeAt(i); + + // Find field in POJO class + Field field = findField(pojoClass, fieldName); + if (field != null) { + pojoFields[i] = field; + + // Check if the field type is supported + if (!SUPPORTED_TYPES.containsKey(fieldType.getTypeRoot())) { + throw new UnsupportedOperationException( + "Unsupported field type " + + fieldType.getTypeRoot() + + " for field " + + field.getName()); + } + + // Create the appropriate converter for this field + converters[i] = createFieldToRowConverter(fieldType, field); + } else { + LOG.warn( + "Field '{}' not found in POJO class {}. Will return null for this field.", + fieldName, + pojoClass.getName()); + converters[i] = obj -> null; + } + } + + return converters; + } + + /** Creates field converters for converting from Row to POJO for each field in the schema. */ + private RowToFieldConverter[] createRowToFieldConverters() { + RowToFieldConverter[] converters = new RowToFieldConverter[rowType.getFieldCount()]; + + for (int i = 0; i < rowType.getFieldCount(); i++) { + DataType fieldType = rowType.getTypeAt(i); + Field field = pojoFields[i]; + + if (field != null) { + // Create the appropriate converter for this field + converters[i] = createRowToFieldConverter(fieldType, field); + } else { + // Field not found in POJO + converters[i] = (row, pos) -> null; + } + } + + return converters; + } + + /** + * Finds a field in the given class or its superclasses. + * + * @param clazz The class to search + * @param fieldName The name of the field to find + * @return The field, or null if not found + */ + @Nullable + private Field findField(Class<?> clazz, String fieldName) { + Class<?> currentClass = clazz; + while (currentClass != null) { + try { + Field field = currentClass.getDeclaredField(fieldName); + field.setAccessible(true); + return field; + } catch (NoSuchFieldException e) { + currentClass = currentClass.getSuperclass(); + } + } + return null; + } + + /** + * Creates a field converter for converting from POJO to Row for a specific field based on its + * data type. + * + * @param fieldType The Fluss data type + * @param field The Java reflection field + * @return A converter for this field + */ + private FieldToRowConverter createFieldToRowConverter(DataType fieldType, Field field) { + field.setAccessible(true); + + switch (fieldType.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case BINARY: + case BYTES: + return field::get; + case CHAR: + case STRING: + return obj -> { + Object value = field.get(obj); + return value == null ? null : BinaryString.fromString(value.toString()); + }; + case DECIMAL: + return obj -> { + Object value = field.get(obj); + if (value == null) { + return null; + } + if (value instanceof BigDecimal) { + DecimalType decimalType = (DecimalType) fieldType; + return Decimal.fromBigDecimal( + (BigDecimal) value, + decimalType.getPrecision(), + decimalType.getScale()); + } else { + throw new IllegalArgumentException( + String.format( + "Field %s is not a BigDecimal. Cannot convert to DecimalData.", + field.getName())); + } + }; + case DATE: + return obj -> { + Object value = field.get(obj); + if (value == null) { + return null; + } + if (value instanceof LocalDate) { + return (int) ((LocalDate) value).toEpochDay(); + } else { + LOG.warn( + "Field {} is not a LocalDate. Cannot convert to int days.", + field.getName()); + return null; + } + }; + case TIME_WITHOUT_TIME_ZONE: + return obj -> { + Object value = field.get(obj); + if (value == null) { + return null; + } + if (value instanceof LocalTime) { + LocalTime localTime = (LocalTime) value; + return (int) (localTime.toNanoOfDay() / 1_000_000); + } else { + LOG.warn( + "Field {} is not a LocalTime. Cannot convert to int millis.", + field.getName()); + return null; + } + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return obj -> { + Object value = field.get(obj); + if (value == null) { + return null; + } + if (value instanceof LocalDateTime) { + return TimestampNtz.fromLocalDateTime((LocalDateTime) value); + } else { + LOG.warn( + "Field {} is not a LocalDateTime. Cannot convert to TimestampData.", + field.getName()); + return null; + } + }; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return obj -> { + Object value = field.get(obj); + if (value == null) { + return null; + } + if (value instanceof Instant) { + return TimestampLtz.fromInstant((Instant) value); + } else if (value instanceof OffsetDateTime) { + OffsetDateTime offsetDateTime = (OffsetDateTime) value; + return TimestampLtz.fromInstant(offsetDateTime.toInstant()); + } else { + LOG.warn( + "Field {} is not an Instant or OffsetDateTime. Cannot convert to TimestampData.", + field.getName()); + return null; + } + }; + default: + LOG.warn( + "Unsupported type {} for field {}. Will use null for it.", + fieldType.getTypeRoot(), + field.getName()); + return obj -> null; Review Comment: Maybe better throw an exception if the user attempts to convert something with an unsupported data type? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
