In the meantime maybe you can look at the implementation we used during our 
thesis? It is not complete by any stretch but it worked well enough for the 
join-order benchmark. 

https://github.com/Mikkel-MJ/incubator-wayang-thesis/blob/main/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java

On 2025/12/12 14:58:20 "Treykorn, Felix" wrote:
> Hello everyone,
> we are three students from Hasso-Plattner-Institute, Potsdam who are working 
> on a contribution to the Wayang project. We are getting help from Zoi Kaoudi 
> when we have questions and she pointed us to the dev list with this one. In 
> our work we noticed, that the JdbcExecutor does not yet support multiple 
> sources for an Operator. E.g. the following code throws "Invalid jdbc stage: 
> multiple sources are not currently supported"
> 
> 
> 
> 
> @Test
> void testJoinDbSources() {
>    WayangContext wayangContext = getTestWayangContext()
>            .withPlugin(Java.basicPlugin())
>            .withPlugin(Spark.basicPlugin())
>            .withPlugin(Postgres.plugin());
> 
>    // Two logical sources over the same table.poster_link
>    TableSource table1 = new PostgresTableSource("imdb_top_1000",  
> "series_title", "released_year", "certificate", "runtime", "genre", 
> "imdb_rating", "overview", "meta_score", "director", "star1", "star2", 
> "star3", "star4", "no_of_votes", "gross");
>    TableSource table2 = new PostgresTableSource("imdb_top_1000",  
> "series_title", "released_year", "certificate", "runtime", "genre", 
> "imdb_rating", "overview", "meta_score", "director", "star1", "star2", 
> "star3", "star4", "no_of_votes", "gross");
> 
>    // Join on series_title
>    JoinOperator<Record, Record, String> joinOperator =
>            new JoinOperator<>(
>                    record -> record.getString(0),
>                    record -> record.getString(0),
>                    Record.class,
>                    Record.class,
>                    String.class
>            );
> 
>    joinOperator.getKeyDescriptor0()
>            .withSqlImplementation("imdb_top_1000", "series_title");
>    joinOperator.getKeyDescriptor1()
>            .withSqlImplementation("imdb_top_1000", "series_title");
> 
>    joinOperator.addTargetPlatform(Postgres.platform());
> 
>    // Wire up both DB sources as inputs to the join.
>    table1.connectTo(0, joinOperator, 0);
>    table2.connectTo(0, joinOperator, 1);
> 
>    // Collect results.
>    Collection<Tuple2<Record, Record>> collector = new ArrayList<>();
>    LocalCallbackSink<Tuple2<Record, Record>> sink =
>            LocalCallbackSink.createCollectingSink(
>                    collector,
>                    DataSetType.createDefaultUnchecked(Tuple2.class)
>            );
>    joinOperator.connectTo(0, sink, 0);
>    // Execute the plan.
>    wayangContext.execute("PostgreSql join DB-DB", new WayangPlan(sink));
> 
>    // Basic sanity check: we should get at least self-joins.
>    assertFalse(collector.isEmpty(), "Join result should not be empty.");
> }
> 
> 
> For our contribution we want to be able to use an operator similar to the 
> Join on the Postgres Platform with multiple sources. Is there already 
> existing code, that extends the JdbcExecutor() with this functionality?
> 
> Best,
> Anton, Max and Felix
> 

Reply via email to