Stupid me. Thanks! Of course, it cannot work. I forgot to assign ds to itself:
ds = ds.x().distinct(); result = ds.collect(); I guess it was to late in the night ;) -Matthias On 06/25/2015 07:58 AM, Chiwan Park wrote: > Although you run > `ds.map(blahblah).sortPartition(blahblah).mapPartition(blahblah).distinct()`, > DataSet ds is not changed. > You should receive the result of transformation. > > So if you modify the code to `intermediateResult = blahblah; result = > intermediateResult.collect();`, the test works. > > Regards, > Chiwan Park > >> On Jun 25, 2015, at 10:03 AM, Matthias J. Sax >> <mj...@informatik.hu-berlin.de> wrote: >> >> Hi, >> >> I worked on rewriting flink-test according to >> https://issues.apache.org/jira/browse/FLINK-2275 >> >> In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit >> something strange. When I rewrite the code slightly differently, the >> test passes or fails and I have no idea why. >> >> The following code works (result is of type java.util.List) >> >>> result = ds >>> .map(new IdMapper()).setParallelism(4) // parallelize input >>> .sortPartition(1, Order.DESCENDING) >>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new >>> Tuple3Checker())) >>> .distinct().collect(); >> >> Rewriting the above as follows result in a failing test: >> >>> ds.map(new IdMapper()).setParallelism(4) // parallelize input >>> .sortPartition(1, Order.DESCENDING) >>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new >>> Tuple3Checker())) >>> .distinct(); >>> result = ds.collect(); >> >> I have no clue what the problem might be. The code looks semantically >> identical to me. Can anyone explain the difference? Do I miss anything? >> Or is this a bug? >> >> You can find the working version of the code in my github repo: >> https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests >> >> >> -Matthias >> > > > > > >
signature.asc
Description: OpenPGP digital signature