twalthr commented on a change in pull request #18011:
URL: https://github.com/apache/flink/pull/18011#discussion_r762969909



##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
##########
@@ -1501,10 +1502,13 @@ Tuple toExternalImpl(RowData value) {
     }
 
     /** Converter for case class. */
-    public static final class CaseClassConverter extends 
AbstractRowDataConverter<Product> {
+    public static final class CaseClassConverter extends 
AbstractRowDataConverter<Object> {
 
         private static final long serialVersionUID = -966598627968372952L;
 
+        private static final Class<?> PRODUCT_CLASS = getProductClass();

Review comment:
       mark `@Nullable` for both

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
##########
@@ -1515,21 +1519,58 @@ public CaseClassConverter(TupleTypeInfoBase t, 
DataType[] fieldTypes) {
         }
 
         @Override
-        RowData toInternalImpl(Product value) {
+        RowData toInternalImpl(Object value) {
             GenericRowData genericRow = new GenericRowData(t.getArity());
             for (int i = 0; i < t.getArity(); i++) {
-                genericRow.setField(i, 
converters[i].toInternal(value.productElement(i)));
+                genericRow.setField(i, 
converters[i].toInternal(invokeProductElement(value, i)));
             }
             return genericRow;
         }
 
         @Override
-        Product toExternalImpl(RowData value) {
+        Object toExternalImpl(RowData value) {
             Object[] fields = new Object[t.getArity()];
             for (int i = 0; i < t.getArity(); i++) {
                 fields[i] = converters[i].toExternal(value, i);
             }
-            return (Product) serializer.createInstance(fields);
+            return serializer.createInstance(fields);
+        }
+
+        private static Class<?> getProductClass() {
+            try {
+                return Class.forName(
+                        "scala.Product", false, 
DataFormatConverter.class.getClassLoader());

Review comment:
       use thread classloader instead

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
##########
@@ -1515,21 +1516,40 @@ public CaseClassConverter(TupleTypeInfoBase t, 
DataType[] fieldTypes) {
         }
 
         @Override
-        RowData toInternalImpl(Product value) {
+        RowData toInternalImpl(Object value) {
             GenericRowData genericRow = new GenericRowData(t.getArity());
             for (int i = 0; i < t.getArity(); i++) {
-                genericRow.setField(i, 
converters[i].toInternal(value.productElement(i)));
+                try {
+                    genericRow.setField(
+                            i,
+                            converters[i].toInternal(
+                                    PRODUCT_CLASS
+                                            .getMethod("productElement", 
int.class)
+                                            .invoke(value, i)));
+                } catch (Throwable e) {
+                    // Ignore, this should never happen unless scala.Product 
breaks its API
+                }
             }
             return genericRow;
         }
 
         @Override
-        Product toExternalImpl(RowData value) {
+        Object toExternalImpl(RowData value) {
             Object[] fields = new Object[t.getArity()];
             for (int i = 0; i < t.getArity(); i++) {
                 fields[i] = converters[i].toExternal(value, i);
             }
-            return (Product) serializer.createInstance(fields);
+            return serializer.createInstance(fields);
+        }
+
+        private static Class<?> getProductClass() {
+            try {
+                return Class.forName(
+                        "scala.Product", false, 
DataFormatConverter.class.getClassLoader());
+            } catch (ClassNotFoundException e) {
+                // Ignore, no scala available in the classpath
+                return null;

Review comment:
       You have to consider that the instance might also be serialized and 
executed at 2 different locations. Client side and task manager side. It is not 
guaranteed that the same classpath exists in both settings. With "on access" I 
mean callers of `toInternal` method. So this should not be noisy for non-Scala 
developers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to