Hey Geoffrey,

I was able to reproduce the error and will look into it in more detail tomorrow.


Regards,
Chesnay

On 12.10.2016 23:09, Geoffrey Mon wrote:
Hello,

Has anyone had a chance to look into this? I am currently working on the
problem but I have minimal understanding of how the internal Flink Python
API works; any expertise would be greatly appreciated.

Thank you very much!

Geoffrey

On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <geof...@gmail.com> wrote:

Hi Chesnay,

Heh, I have discovered that if I do not restart Flink after running my
original problematic script, then similar issues will manifest themselves
in other otherwise working scripts. I haven't been able to completely
narrow down the problem, but I promise this new script will have a
ClassCastException that is completely reproducible. :)
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a

Thanks,
Geoffrey

On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <ches...@apache.org>
wrote:

Hello Geoffrey,

this one works for me as well :D

Regards,
Chesnay

On 28.09.2016 05:38, Geoffrey Mon wrote:
Hello Chesnay,

Thank you for your help. After receiving your message I recompiled my
version of Flink completely, and both the NullPointerException listed in
the TODO and the ClassCastException with the join operation went away.
Previously, I had been only recompiling the modules of Flink that had
been
changed to save time using "mvn clean install -pl :module" and apparently
that may have been causing some of my issues.

Now, the problem is more clear: when a specific group reduce function in
my
research project plan file is used within an iteration, I get a
ClassCastException exception:
Caused by: java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
at

org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
at
org.apache.flink.runtime.iterative.io
.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
at

org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at

org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
at

org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
at

org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
at

org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at

org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at

org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
at java.lang.Thread.run(Thread.java:745)

I'm not sure why this is causing an exception, and I would greatly
appreciate any assistance. I've revised the barebones error-causing plan
file to focus on this new error source:
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
The group reduce function in question seems to work just fine outside of
iterations. I have organized the commits and pushed to a new branch to
make
it easier to test and hopefully review soon:
https://github.com/GEOFBOT/flink/tree/new-iterations

Cheers,
Geoffrey

On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <ches...@apache.org>
wrote:
Hello Geoffrey,

i could not reproduce this issue with the commits and plan you provided.

I tried out both the FLINK-4098 and bulk-iterations branches (and
reverted back to the specified commits) and built Flink from scratch.

Could you double check that the code you provided produces the error?
Also, which OS/python version are you using?

Regards,
Chesnay

On 20.09.2016 11:13, Chesnay Schepler wrote:
Hello,

I'll try to take a look this week.

Regards,
Chesnay

On 20.09.2016 02:38, Geoffrey Mon wrote:
Hello all,

I have recently been working on adding bulk iterations to the Python
API of
Flink in order to facilitate a research project I am working on. The
current changes can be seen in this GitHub diff:

https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
This implementation seems to work for, at least, simple examples,
such as
incrementing numbers in a data set. However, with the transformations
required for my project, I get an exception
"java.lang.ClassCastException:
[B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
from the
deserializers called by

org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
I've created the following simplified Python plan by stripping down my
research project code to the problem-causing parts:
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a

I have been working on this issue but I don't have any ideas on what
might
be the problem. Perhaps someone more knowledgeable about the interior
of
the Python API could kindly help?

Thank you very much.

Geoffrey Mon



Reply via email to