Hi,
This issue was resolved (earlier). The problem was incorrect input. The
incorrect input made the program to not satisfy the convergence criterion
and kept running forever.

Regards,
Janani


On Mon, Jul 7, 2014 at 3:14 PM, Janani Chakkaradhari <[email protected]
> wrote:

> ​
> Hi Stephan,
>
> Thanks for the reply. Below I pasted the code for the other variant which
> runs faster compared to the one in the previous email. Also here, I tried
> to answer your questions.
>
> *1) Your code uses an extra join for the convergence check. Does
> othervariant you talk about also introduce an additional join?*
>
> Yes, but it is a filter join. In my case I don't want to filter the tuples
> during the iteration, instead I want to check the convergence at the end of
> every super step and stop the entire iteration when convergence check is
> true.
>
>
> *2) The join you introduce, does it increase the data volume (find
> multiple matches partners per record) such that more records are emitted?*
>
> No. In the code from the previous email, each tuple emitted from
> DampingMapper is joined with exactly one tuple in the iteration. Hence the
> final join (L1_NormDiff()) in the iteration exactly emits one tuple for
> each joining key.
>
> For example if we consider it for page rank algorithm, the newly computed
> page rank for each page (each tuple) is compared with its value in the
> previous iteration and find the its difference in L1_NormDiff join. I want
> to aggregate this difference for all the pages in the current iteration and
> check this aggregated value is less than certain threshold at the end of
> each super step.
>
> Bulk Iteration:
>
> DataSet<Tuple2<Long, Double>> edgeScores = d.map(new
> InitializeRandomVector()).name("V");
>         IterativeDataSet<Tuple2<Long,Double>> iteration =
> edgeScores.iterate(maxIterations)
>                 .name("EdgeScoreVector_BulkIteration");
>
>         DataSet<Tuple2<Long, Double>> new_edgeScores = iteration
>                     .join(d).where(0).equalTo(0).with(new
> V1_HadamardProduct()).name("V1")
>                     .join(srcIncMat).where(0).equalTo(0).with(new
> V2_SrcIncWithV1()).name("V2")
>                     .groupBy(0).aggregate(Aggregations.SUM, 1)
>                     .join(tarIncMat).where(0).equalTo(1).with(new
> V3_TarIncWithV2()).name("V3")
>                     .map(new DampingMapper(c, numEdges));
>
>
>         DataSet<Tuple2<Long, Double>> convergedVector =
> iteration.closeWith(
>                 new_edgeScores,
>
> new_edgeScores.join(iteration).where(0).equalTo(0).filter(new
> EpsilonFilter()));
>
>
>     public static final class EpsilonFilter extends
> FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
>
>         @Override
>         public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long,
> Double>> value) {
>             return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
>         }
>     }
>
> Thanks,
> Janani
>
>
> On Mon, Jul 7, 2014 at 12:13 PM, Stephan Ewen <[email protected]> wrote:
>
>> Hi Janani!
>>
>> Can you give us the code of the other variant, for comparison?
>>
>> Also, some quick questions to help figuring this out:
>>
>> 1) Your code uses an extra join for the convergence check. Does other
>> variant you talk about also introduce an additional join?
>>
>> 2) The join you introduce, does it increase the data volume (find multiple
>> matches partners per record) such that more records are emitted?
>>
>> Stephan
>>
>
>

Reply via email to