gyang94 commented on code in PR #1877:
URL: https://github.com/apache/fluss/pull/1877#discussion_r2692929592


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PojoToRowConverter.java:
##########
@@ -77,12 +77,62 @@ public class PojoToRowConverter<T> {
 
     static {
         SUPPORTED_TYPES.put(DataTypeRoot.BOOLEAN, orderedSet(Boolean.class, 
boolean.class));
+
+        // Numeric types with widening support - include primitives and boxed 
types
         SUPPORTED_TYPES.put(DataTypeRoot.TINYINT, orderedSet(Byte.class, 
byte.class));
-        SUPPORTED_TYPES.put(DataTypeRoot.SMALLINT, orderedSet(Short.class, 
short.class));
-        SUPPORTED_TYPES.put(DataTypeRoot.INTEGER, orderedSet(Integer.class, 
int.class));
-        SUPPORTED_TYPES.put(DataTypeRoot.BIGINT, orderedSet(Long.class, 
long.class));
-        SUPPORTED_TYPES.put(DataTypeRoot.FLOAT, orderedSet(Float.class, 
float.class));
-        SUPPORTED_TYPES.put(DataTypeRoot.DOUBLE, orderedSet(Double.class, 
double.class));
+        SUPPORTED_TYPES.put(
+                DataTypeRoot.SMALLINT,
+                orderedSet(Short.class, short.class, Byte.class, byte.class));
+        SUPPORTED_TYPES.put(
+                DataTypeRoot.INTEGER,
+                orderedSet(
+                        Integer.class,
+                        int.class,
+                        Short.class,
+                        short.class,
+                        Byte.class,
+                        byte.class));
+        SUPPORTED_TYPES.put(
+                DataTypeRoot.BIGINT,
+                orderedSet(
+                        Long.class,
+                        long.class,
+                        Integer.class,
+                        int.class,
+                        Short.class,
+                        short.class,
+                        Byte.class,
+                        byte.class));
+        SUPPORTED_TYPES.put(
+                DataTypeRoot.FLOAT,
+                orderedSet(
+                        Float.class,
+                        float.class,
+                        Long.class,
+                        long.class,
+                        Integer.class,
+                        int.class,
+                        Short.class,
+                        short.class,
+                        Byte.class,
+                        byte.class));
+        SUPPORTED_TYPES.put(
+                DataTypeRoot.DOUBLE,
+                orderedSet(
+                        Double.class,
+                        double.class,
+                        Float.class,
+                        float.class,
+                        Long.class,
+                        long.class,
+                        Integer.class,
+                        int.class,
+                        Short.class,
+                        short.class,
+                        Byte.class,
+                        byte.class));

Review Comment:
   There are many duplicated code and definition for this kind of “type 
hierarchy”. We can simplify it with "incremental composition" or other ways. 
Here is a possible sample I am thinking of:
   ```
   //  1. basic class type definination
   Set<Class<?>> byteClasses = orderedSet(Byte.class, byte.class);
   Set<Class<?>> shortClasses = orderedSet(Short.class, short.class);
   Set<Class<?>> intClasses = orderedSet(Integer.class, int.class);
   Set<Class<?>> longClasses = orderedSet(Long.class, long.class);
   Set<Class<?>> floatClasses = orderedSet(Float.class, float.class);
   Set<Class<?>> doubleClasses = orderedSet(Double.class, double.class);
   
   // 2. combine them hierarchically.
   SUPPORTED_TYPES.put(DataTypeRoot.SMALLINT, 
       combine(shortClasses, byteClasses));
   
   SUPPORTED_TYPES.put(DataTypeRoot.INTEGER, 
       combine(intClasses, SUPPORTED_TYPES.get(DataTypeRoot.SMALLINT)));
   
   SUPPORTED_TYPES.put(DataTypeRoot.BIGINT, 
       combine(longClasses, SUPPORTED_TYPES.get(DataTypeRoot.INTEGER)));
   
   SUPPORTED_TYPES.put(DataTypeRoot.FLOAT, 
       combine(floatClasses, SUPPORTED_TYPES.get(DataTypeRoot.BIGINT)));
   
   SUPPORTED_TYPES.put(DataTypeRoot.DOUBLE, 
       combine(doubleClasses, SUPPORTED_TYPES.get(DataTypeRoot.FLOAT)));
   
   // 3. combine function
   private Set<Class<?>> combine(Set<Class<?>> primary, Set<Class<?>> 
inherited) {
       Set<Class<?>> result = new LinkedHashSet<>(primary);
       result.addAll(inherited);
       return Collections.unmodifiableSet(result);
   }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PojoToRowConverter.java:
##########
@@ -335,6 +389,109 @@ public GenericRow convert(T pojo) {
         return row;
     }
 
+    // Numeric conversion methods for type widening
+    private static Object convertToTinyInt(Field field, Object value)
+            throws IllegalArgumentException {
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof Byte) {
+            return value;
+        }
+        throw new IllegalArgumentException(
+                "Field " + field.getName() + " cannot be converted to 
TINYINT");
+    }
+
+    private static Object convertToSmallInt(Field field, Object value)
+            throws IllegalArgumentException {
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof Short) {
+            return value;
+        }
+        if (value instanceof Byte) {
+            return ((Byte) value).shortValue();
+        }
+        throw new IllegalArgumentException(
+                "Field " + field.getName() + " cannot be converted to 
SMALLINT");
+    }
+
+    private static Object convertToInteger(Field field, Object value)
+            throws IllegalArgumentException {
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof Integer) {
+            return value;
+        }
+        if (value instanceof Short) {
+            return ((Short) value).intValue();
+        }
+        if (value instanceof Byte) {
+            return ((Byte) value).intValue();
+        }
+        throw new IllegalArgumentException(
+                "Field " + field.getName() + " cannot be converted to 
INTEGER");
+    }
+
+    private static Object convertToBigInt(Field field, Object value)
+            throws IllegalArgumentException {
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof Long) {
+            return value;
+        }
+        if (value instanceof Integer) {
+            return ((Integer) value).longValue();
+        }
+        if (value instanceof Short) {
+            return ((Short) value).longValue();
+        }
+        if (value instanceof Byte) {
+            return ((Byte) value).longValue();
+        }
+        throw new IllegalArgumentException(
+                "Field " + field.getName() + " cannot be converted to BIGINT");
+    }
+
+    private static Object convertToFloat(Field field, Object value)

Review Comment:
   Similar duplication for `if-else` in these series of `convertToXXX` 
functions. We can make a general `safeConvert` function.
   ```
   private static <T> T safeConvert(DataTypeRoot targetRoot, Field field, 
Object value, Function<Number, T> converter) {
       if (value == null) {
           return null;
       }
       
       // 1. validation (with SUPPORT_TYPES)
       Set<Class<?>> supported = SUPPORTED_TYPES.getOrDefault(targetRoot, 
Collections.emptySet());
       if (!supported.contains(value.getClass())) {
           throw new IllegalArgumentException(
               String.format("Field %s (type: %s) cannot be converted to %s", 
                   field.getName(), value.getClass().getSimpleName(), 
targetRoot));
       }
   
       // 2. convert
       if (value instanceof Number) {
           return converter.apply((Number) value);
       }
   
       throw new IllegalArgumentException("Unexpected value type for field " + 
field.getName());
   }
   ```
   Then in those `convertToXX` functions
   ```
   private static Object convertToTinyInt(Field field, Object value) {
       return safeConvert(DataTypeRoot.TINYINT, field, value, 
Number::byteValue);
   }
   
   private static Object convertToSmallInt(Field field, Object value) {
       return safeConvert(DataTypeRoot.SMALLINT, field, value, 
Number::shortValue);
   }
   
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to