Hi Team, I am working on Iceberg in process compaction, and trying to use SQL window join to compare 2 streams like this:
*Table fsFiles = tEnv.sqlQuery( "SELECT runId, location, window_start, window_end " + "FROM TABLE(" + "* *TUMBLE(" + " TABLE " + fileSystemFilesTable + "," + " DESCRIPTOR(ts), " +* * "* *INTERVAL '1' SECONDS))");* *Table tableFiles = tEnv.sqlQuery(* * "SELECT runId, location, window_start, window_end " +* * "FROM TABLE(" +* * "* *TUMBLE(" +* * " TABLE " + **tableFilesTable** + "," +* * " DESCRIPTOR(ts), " +** "* *INTERVAL '1' SECONDS))");* Then I print out these streams with the following code, I see the values in the logs: *tEnv.toDataStream(fsFiles).print("FS");* *tEnv.toDataStream(tableFiles).print("TS");* The result is: *FS:2> +I[1705405510802, file:/var/folders/19/xs17kb0j7dj0klq324_vj7sc0000gn/T/junit13711198986865553391/db.db/test_table_with_pk/metadata/00000-b717c629-bb71-48df-a30b-615aeb320aec.metadata.json, 2024-01-16T11:45:10, 2024-01-16T11:45:11]* *[..]* *TS:2> +I[1705405510802, file:/var/folders/19/xs17kb0j7dj0klq324_vj7sc0000gn/T/junit13711198986865553391/db.db/test_table_unpartitioned/metadata/snap-532818363465442978-1-dc47e70d-82eb-490a-a21d-c032b88c3303.avro, 2024-01-16T11:45:10, 2024-01-16T11:45:11]* *[..]* So this is as I expected the 2 streams periodically emit the incoming data files with the runIds, timestamps. Now, I try to run an ANTI JOIN on these streams: *Table missingFiles = tEnv.sqlQuery(* * " SELECT ts, location\n" +* * " FROM (\n" +* * " SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ", DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +* * " ) L WHERE L.location NOT IN (\n" +* * " SELECT location FROM ( \n" +* * " SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles + ", DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +* * " ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end)");* And event though there is some missing files based on the logs, I do not see any records in the logs for the missing table: *tEnv.toDataStream(missingFiles).print("MISSING");* Just for trying out a different way of solving/checking this, I tried to have a FULL JOIN to see how the join works: *Table joined = tEnv.sqlQuery(* * " SELECT fs_files.location AS fs_location, table_files.location AS table_location,\n" +* * " COALESCE(fs_files.window_start, table_files.window_start) as window_start,\n" +* * " COALESCE(fs_files.window_end, table_files.window_end) as window_end\n" +* * " FROM (SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ", DESCRIPTOR(ts), INTERVAL '1' SECONDS))) fs_files\n" +* * " LEFT JOIN (SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles + ", DESCRIPTOR(ts), INTERVAL '1' SECONDS))) table_files\n" +* * " ON fs_files.location = table_files.location AND\n" +* * " fs_files.window_start = table_files.window_start AND \n" +* * " fs_files.window_end = table_files.window_end\n");* And there is nothing in the logs for the join either. I think I might miss something around the windowing, and my joined windows are not triggered with the complex queries, but I am stuck at the moment, so any help would be appreciated. Thanks, Peter