Hi Fabian,

Thank you for the quick reply and for looking into it.

Sorry, I was a bit too quick with the field reference accusation. Turns out, my TypeInformation was wrong, hence the invalid reference exception.

However, the type erasure problem still holds.

The actual code can be found here [1]. The code runs fine using the LocalExecutionEnvironment and it also runs on the cluster when using a non-Pojo type for T (e.g. java.lang.Long). However, for Pojo types, it fails on the cluster with a type erasure related exception. Hence, I manually created the TypeInformation for the Embedding class:

public static <T> TypeInformation<Embedding<T>>getType(Class<T> clazz) {
  TypeInformation<T> type = TypeInformation.of(clazz); TypeInformation<T> arrayType = 
ObjectArrayTypeInfo.getInfoFor(type); return new TupleTypeInfo<>(arrayType, arrayType); }

and for the EmbeddingWithTiePoint class:

public static <T> TypeInformation<EmbeddingWithTiePoint<T>>getType(Class<T> 
clazz) {
  TypeInformation<T> type = TypeInformation.of(clazz); TypeInformation<Embedding<T>> 
embeddingType = Embedding.getType(clazz); return new TupleTypeInfo<>(type, embeddingType); }

Note, that this produces the same TypeInformation as the automatic type extraction does in the local, working scenario.

I provided the type info to the UDF which initially creates the EmbeddingWithTiePoint instances [1]:

DataSet<EmbeddingWithTiePoint<K>> initialEmbeddings = vertices
  .filter(new ElementHasCandidate<>(traversalCode.getStep(0).getFrom()))
  .map(new BuildEmbeddingWithTiePoint<>(keyClass, traversalCode, vertexCount, 
edgeCount))
  .returns(EmbeddingWithTiePoint.getType(keyClass));

However, Flink tells me that I now need to provide the same type information at all places where the output is of type EmbeddingWithTiePoint [2], [3]. If I do so, the program fails with a clast cast exception:

Caused by: java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.tuples.EmbeddingWithTiePoint at org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.UpdateEdgeMappings.join(UpdateEdgeMappings.java:50) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:149) at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:222)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)

I guess, the issue is not really the missing TypeInformation, but something that is done differently when using the cluster execution and Pojo types. Maybe related to the generic array creation via reflection? Hope this helps.

Best, Martin

[1] https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/DistributedTraverser.java#L170

[2] https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/DistributedTraverser.java#L215

[3] https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/DistributedTraverser.java#L234

On 19.10.2016 09:33, Fabian Hueske wrote:
Hi Martin,

thanks for reporting the problem and providing code to reproduce it.

Would you mind to describe the problem with the forwarding annotations in
more detail?
I would be interested in the error message and how the semantic annotation
is provided (@ForwardFields or withForwardedFields()).

Thanks, Fabian

2016-10-19 8:52 GMT+02:00 Martin Junghanns <m.jungha...@mailbox.org>:

Hi,

I am running into a type erasure problem which only occurs when I execute
the code using a Flink cluster (1.1.2). I created a Gist [1] which
reproduces the problem. I also added a unit test to show that it does not
fail in local and collection mode.

Maybe it is also interesting to mention that - in my actual code - I
manually created a TypeInformation (the same which is automatically created
on local execution) and gave it to the operators using .returns(..).
However, this lead to the issue, that my field forwarding annotations
failed with invalid reference exceptions (the same annotations that work
locally).

The issue came up after I generalized the core of one our algorithms.
Before, when the types were non-generic, this ran without problems locally
and on the cluster.

Thanks in advance!

Cheers, Martin

[1] https://gist.github.com/s1ck/caf9f3f46e7a5afe6f6a73c479948fec

The exception in the Gist case:

The return type of function 'withPojo(Problem.java:58)' could not be
determined automatically, due to type erasure. You can give type
information hints by using the returns(...) method on the result of the
transformation call, or by letting your function implement the
'ResultTypeQueryable' interface.
     org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
     org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
     org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
     Problem.withPojo(Problem.java:60)
     Problem.main(Problem.java:38)



Reply via email to