Hi guys, Please check if it is indeed tuple1 and tuple2, because I might have confused and it is a key, values instead, where the key is the join key. This might affect your casting to record
— Jorge On Fri, 21 Apr 2023 at 2:14 PM Kristian Reitzel <[email protected]> wrote: > Makes sense. We made it work with the Tuple2 as input to the MapOperator. > So now the connectTo works, but we still get an exception. > > We think that we have located the issue to be related to when the join > finds two fields with the same value (i.e. the join condition is > satisfied). Then we get the following exception: > > Caused by: java.lang.ClassCastException: class > org.apache.wayang.basic.data.Tuple2 cannot be cast to class > org.apache.wayang.basic.data.Record (org.apache.wayang.basic.data.Tuple2 > and org.apache.wayang.basic.data.Record are in unnamed module of loader > 'app') > at > org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor$MapFunctionImpl.apply(WayangProjectVisitor.java:48) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedList.java:1239) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) > at > java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > org.apache.wayang.java.operators.JavaLocalCallbackSink.evaluate(JavaLocalCallbackSink.java:72) > at > org.apache.wayang.java.execution.JavaExecutor.execute(JavaExecutor.java:82) > > > > > -----Original Message----- > From: Jorge Arnulfo Quiané Ruiz <[email protected]> > Sent: Friday, 21 April 2023 13.58 > To: [email protected] > Subject: Re: Problem: Connect Join to Project in the SQL API > > Hi Michelle and Kristian, > > Yeah this is confusing at first glance because you would expect a join > record as output, however this would mean that the join operator has to > decide the kind of join to implement (i.e. left or right join). That’s > given the following two tuples (matching on the join attribute a) from R > and S, respectively: > — <a, b, c, d> > — <e, a, f> > One could have several kind of outputs: > — <a, b, c, d, e, a, f> > — <a, b, c, d, e, f> (without duplicating the join attribute) — <a, b, c, > d> — <e, a, f> — ….. > Therefore, in Wayang we left the decision to the developer and decided to > output both tuples (Tuple2) as follows: > — <a, b, c, d>, <e, a, f> > The following operators must materialise such a join match. > > Therefore, in your case, the following MapOperator should receive a Tuple2 > as input. > > Could you please tell us why changing the input type of the MapOperator is > not working? > > Best, > Jorge > > > On 21 Apr 2023, at 11.56, Kristian Reitzel <[email protected]> wrote: > > > > Hi Dev, > > > > We are working on the Join Operator in the SQL API. > > > > After connecting the two table scans of type Record, the output slots of > the join operator is of type Tuple2. > > > > This confuses us since we would expect the result of a join to be of > type Record. Furthermore, the projection (which happens after the join) > throws the following Exception: > > > > Exception in thread "main" java.lang.IllegalArgumentException: Cannot > connect out@Join[2->1, id=23ad2d17] of > DataSetType[BasicDataUnitType[Tuple2]] to in@Map[1->1, id=6bce4140] of > type DataSetType[BasicDataUnitType[Record]]. > > at > org.apache.wayang.core.plan.wayangplan.Operator.connectTo(Operator.java:206) > > at > org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor.visit(WayangProjectVisitor.java:42) > > at > org.apache.wayang.api.sql.calcite.converter.WayangRelConverter.convert(WayangRelConverter.java:16) > > at > org.apache.wayang.api.sql.calcite.optimizer.Optimizer.convert(Optimizer.java:202) > > at > org.apache.wayang.api.sql.context.SqlContext.executeSql(SqlContext.java:92) > > at SqlAPI.examplePostgres(SqlAPI.java:46) > > at SqlAPI.main(SqlAPI.java:77) > > > > We tried changing the Wayang MapOperator inputTypeClass to be a Tuple2, > but this does not fix the problem. > > > > Is there anyone that can explain why the JoinOperator outputs a Tuple2 > and perhaps also how we then connect the output of the join to the input of > the projection? > > > > Best, > > Michelle and Kristian > > > >
