hemanthsavasere commented on code in PR #1877:
URL: https://github.com/apache/fluss/pull/1877#discussion_r2540898649


##########
website/docs/engine-flink/datastream.mdx:
##########
@@ -164,6 +164,83 @@ public class OrderDeserializationSchema implements 
FlussDeserializationSchema<Or
 }
 ```
 
+#### Using POJO Converter
+
+For a more convenient approach when deserializing to POJOs, you can use the 
`PojoToRowConverter` utility from the `fluss-flink-common` module:
+
+```java
+import org.apache.fluss.flink.utils.PojoToRowConverter;
+
+public class PojoDeserializationSchema implements 
FlussDeserializationSchema<Order> {
+    private transient PojoToRowConverter<Order> converter;
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        // The converter will be initialized when we have the row schema
+    }
+
+    @Override
+    public Order deserialize(LogRecord record) throws Exception {
+        InternalRow row = record.getRow();
+        
+        // If you need to convert InternalRow to your POJO, you'll need to 
handle it manually
+        // The PojoToRowConverter is designed for POJO -> Row conversion
+        // For Row -> POJO, use manual field extraction as shown in the 
previous example
+        
+        return new Order(
+            row.getLong(0),
+            row.getLong(1), 
+            row.getInt(2),
+            row.getString(3).toString()
+        );
+    }
+
+    @Override
+    public TypeInformation<Order> getProducedType(RowType rowSchema) {
+        return TypeInformation.of(Order.class);
+    }
+}
+```
+
+**Note:** The `PojoToRowConverter` is primarily designed for converting POJOs 
to Fluss rows (for writing). For reading (deserializing rows to POJOs), you'll 
need to manually extract fields from the `InternalRow` as shown in the examples 
above.
+
+##### Numeric Type Widening for POJO Converters
+
+When using `PojoToRowConverter` for serialization, the converter supports 
automatic numeric type widening following Java's safe widening rules. This 
allows you to use smaller numeric types in your POJOs even when the table 
schema specifies larger types.
+
+**Supported Widenings**:
+- `byte` → `short`, `int`, `long`, `float`, `double`
+- `short` → `int`, `long`, `float`, `double`
+- `int` → `long`, `float`, `double`
+- `long` → `float`, `double`
+- `float` → `double`

Review Comment:
   I have not implemented the BigInteger as Iceberg doesn't have a native 
BigInteger type as Apache Iceberg deliberately does not include a native 
BigInteger type. Instead, it uses decimal(P,S) with P ≤ 38 for 
arbitrary-precision fixed-point numbers and similar to that Paimon as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to