liuml07 commented on PR #25576:
URL: https://github.com/apache/flink/pull/25576#issuecomment-2657459257

   @reswqa Yes agreed vertex is indeed low-level concept. I think some 
information about the error is still useful than built-in exception 
`ArrayIndexOutOfBoundsException` if the user needs to debug. Any suggestions 
for the error message? I'm thinking to make it clearer by including the inEdge 
as follows:
   ```
   Preconditions.checkState(
           inputIndex < inputSerializers.length,
           "Could not find valid input serializers when creating job graph for 
edge: %s",
           inEdge);
   ```
   
   Sample: 
   >```Could not find valid input serializers when creating job graph for edge: 
(Map-2 -> Sink: Print to Std. Out-3, typeNumber=0, outputPartitioner=FORWARD, 
exchangeMode=UNDEFINED, bufferTimeout=100, outputTag=null, uniqueId=0)```
   
   Regarding the scenario of the failing example, the user job I met a few 
months ago was using a custom TypeInformation that was not implemented 
correctly. To be more specific, the `TypeSerializer` was null for a corner 
case. I can't find to share the real user code here which got fixed now, but 
following dummy example would show the error:
   ```java
   public class MissingSerDeserInTypeInformation {
       public static class MyObject {
           private final Integer value;
           MyObject(Integer value) {
               this.value = value;
           }
           @Override
           public String toString() {
               return "MyObject(" + value + ")";
           }
       }
   
       public static class MyObjectTypeInfo extends TypeInformation<MyObject> {
           public boolean isBasicType() {return false;}
           public boolean isTupleType() {return false;}
           public int getArity() {return 0;}
           public int getTotalFields() {return 0;}
           public Class<MyObject> getTypeClass() {return MyObject.class;}
           public boolean isKeyType() {return false;}
           public TypeSerializer<MyObject> createSerializer(ExecutionConfig 
executionConfig) {
                   return null; // should always return a valid serializer
           }
           public String toString() {return "MyObjectTypeInfo";}
           public boolean equals(Object o) {return false;}
           public int hashCode() {return 0;}
           public boolean canEqual(Object o) {return false;}
       }
   
       public static void main(String[] args) throws Exception {
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.getConfig().disableGenericTypes();
           env.fromCollection(IntStream.range(1, 
20).boxed().collect(Collectors.toList()))
                   .map(MyObject::new)
                   .returns(new MyObjectTypeInfo())
                   .print();
           env.execute();
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to