twalthr commented on a change in pull request #18011:
URL: https://github.com/apache/flink/pull/18011#discussion_r762915428



##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
##########
@@ -1515,21 +1516,40 @@ public CaseClassConverter(TupleTypeInfoBase t, 
DataType[] fieldTypes) {
         }
 
         @Override
-        RowData toInternalImpl(Product value) {
+        RowData toInternalImpl(Object value) {
             GenericRowData genericRow = new GenericRowData(t.getArity());
             for (int i = 0; i < t.getArity(); i++) {
-                genericRow.setField(i, 
converters[i].toInternal(value.productElement(i)));
+                try {
+                    genericRow.setField(
+                            i,
+                            converters[i].toInternal(
+                                    PRODUCT_CLASS

Review comment:
       can we store the method in a transient variable that will be filled on 
access?
   It would make the code a bit more performant.
   ```
   PRODUCT_CLASS.getMethod("productElement", int.class)
   ```

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
##########
@@ -1515,21 +1516,40 @@ public CaseClassConverter(TupleTypeInfoBase t, 
DataType[] fieldTypes) {
         }
 
         @Override
-        RowData toInternalImpl(Product value) {
+        RowData toInternalImpl(Object value) {
             GenericRowData genericRow = new GenericRowData(t.getArity());
             for (int i = 0; i < t.getArity(); i++) {
-                genericRow.setField(i, 
converters[i].toInternal(value.productElement(i)));
+                try {
+                    genericRow.setField(
+                            i,
+                            converters[i].toInternal(
+                                    PRODUCT_CLASS
+                                            .getMethod("productElement", 
int.class)
+                                            .invoke(value, i)));
+                } catch (Throwable e) {
+                    // Ignore, this should never happen unless scala.Product 
breaks its API
+                }
             }
             return genericRow;
         }
 
         @Override
-        Product toExternalImpl(RowData value) {
+        Object toExternalImpl(RowData value) {
             Object[] fields = new Object[t.getArity()];
             for (int i = 0; i < t.getArity(); i++) {
                 fields[i] = converters[i].toExternal(value, i);
             }
-            return (Product) serializer.createInstance(fields);
+            return serializer.createInstance(fields);
+        }
+
+        private static Class<?> getProductClass() {
+            try {
+                return Class.forName(
+                        "scala.Product", false, 
DataFormatConverter.class.getClassLoader());
+            } catch (ClassNotFoundException e) {
+                // Ignore, no scala available in the classpath
+                return null;

Review comment:
       throw a helpful exception on first access of `PRODUCT_CLASS`

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
##########
@@ -1515,21 +1516,40 @@ public CaseClassConverter(TupleTypeInfoBase t, 
DataType[] fieldTypes) {
         }
 
         @Override
-        RowData toInternalImpl(Product value) {
+        RowData toInternalImpl(Object value) {
             GenericRowData genericRow = new GenericRowData(t.getArity());
             for (int i = 0; i < t.getArity(); i++) {
-                genericRow.setField(i, 
converters[i].toInternal(value.productElement(i)));
+                try {
+                    genericRow.setField(
+                            i,
+                            converters[i].toInternal(
+                                    PRODUCT_CLASS
+                                            .getMethod("productElement", 
int.class)
+                                            .invoke(value, i)));
+                } catch (Throwable e) {
+                    // Ignore, this should never happen unless scala.Product 
breaks its API

Review comment:
       let's throw an exception with the cause. You never now what the future 
will bring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to