I think Jorge is correct about the output being a Tuple2 consisted of the two
relations tuple that has been joined.
Taking a look at the JavaJoinOperatorTest.java and SparkJoinOperatorTest.java
if you define a join operator as SparkJoinOperator<Tuple2, Tuple2, Integer> the
output will be a Tuple2<Tuple2, Tuple2>.
So I assume in your case you need a join operator that receives as input the
types <Record, Record, TypeofKey>. Maybe the order is different in the api.
Best
--
Zoi
Στις Σάββατο 22 Απριλίου 2023 στις 09:14:59 μ.μ. CEST, ο χρήστης jorge
Arnulfo Quiané Ruiz <[email protected]> έγραψε:
Hi guys,
Please check if it is indeed tuple1 and tuple2, because I might have
confused and it is a key, values instead, where the key is the join key.
This might affect your casting to record
—
Jorge
On Fri, 21 Apr 2023 at 2:14 PM Kristian Reitzel <[email protected]>
wrote:
> Makes sense. We made it work with the Tuple2 as input to the MapOperator.
> So now the connectTo works, but we still get an exception.
>
> We think that we have located the issue to be related to when the join
> finds two fields with the same value (i.e. the join condition is
> satisfied). Then we get the following exception:
>
> Caused by: java.lang.ClassCastException: class
> org.apache.wayang.basic.data.Tuple2 cannot be cast to class
> org.apache.wayang.basic.data.Record (org.apache.wayang.basic.data.Tuple2
> and org.apache.wayang.basic.data.Record are in unnamed module of loader
> 'app')
> at
> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor$MapFunctionImpl.apply(WayangProjectVisitor.java:48)
> at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at
> java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedList.java:1239)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
> at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
> at
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
> at
> org.apache.wayang.java.operators.JavaLocalCallbackSink.evaluate(JavaLocalCallbackSink.java:72)
> at
> org.apache.wayang.java.execution.JavaExecutor.execute(JavaExecutor.java:82)
>
>
>
>
> -----Original Message-----
> From: Jorge Arnulfo Quiané Ruiz <[email protected]>
> Sent: Friday, 21 April 2023 13.58
> To: [email protected]
> Subject: Re: Problem: Connect Join to Project in the SQL API
>
> Hi Michelle and Kristian,
>
> Yeah this is confusing at first glance because you would expect a join
> record as output, however this would mean that the join operator has to
> decide the kind of join to implement (i.e. left or right join). That’s
> given the following two tuples (matching on the join attribute a) from R
> and S, respectively:
> — <a, b, c, d>
> — <e, a, f>
> One could have several kind of outputs:
> — <a, b, c, d, e, a, f>
> — <a, b, c, d, e, f> (without duplicating the join attribute) — <a, b, c,
> d> — <e, a, f> — …..
> Therefore, in Wayang we left the decision to the developer and decided to
> output both tuples (Tuple2) as follows:
> — <a, b, c, d>, <e, a, f>
> The following operators must materialise such a join match.
>
> Therefore, in your case, the following MapOperator should receive a Tuple2
> as input.
>
> Could you please tell us why changing the input type of the MapOperator is
> not working?
>
> Best,
> Jorge
>
> > On 21 Apr 2023, at 11.56, Kristian Reitzel <[email protected]> wrote:
> >
> > Hi Dev,
> >
> > We are working on the Join Operator in the SQL API.
> >
> > After connecting the two table scans of type Record, the output slots of
> the join operator is of type Tuple2.
> >
> > This confuses us since we would expect the result of a join to be of
> type Record. Furthermore, the projection (which happens after the join)
> throws the following Exception:
> >
> > Exception in thread "main" java.lang.IllegalArgumentException: Cannot
> connect out@Join[2->1, id=23ad2d17] of
> DataSetType[BasicDataUnitType[Tuple2]] to in@Map[1->1, id=6bce4140] of
> type DataSetType[BasicDataUnitType[Record]].
> > at
> org.apache.wayang.core.plan.wayangplan.Operator.connectTo(Operator.java:206)
> > at
> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor.visit(WayangProjectVisitor.java:42)
> > at
> org.apache.wayang.api.sql.calcite.converter.WayangRelConverter.convert(WayangRelConverter.java:16)
> > at
> org.apache.wayang.api.sql.calcite.optimizer.Optimizer.convert(Optimizer.java:202)
> > at
> org.apache.wayang.api.sql.context.SqlContext.executeSql(SqlContext.java:92)
> > at SqlAPI.examplePostgres(SqlAPI.java:46)
> > at SqlAPI.main(SqlAPI.java:77)
> >
> > We tried changing the Wayang MapOperator inputTypeClass to be a Tuple2,
> but this does not fix the problem.
> >
> > Is there anyone that can explain why the JoinOperator outputs a Tuple2
> and perhaps also how we then connect the output of the join to the input of
> the projection?
> >
> > Best,
> > Michelle and Kristian
> >
>
>