Hi Team,

I am working on Iceberg in process compaction, and trying to use SQL window
join to compare 2 streams like this:




*Table fsFiles = tEnv.sqlQuery(    "SELECT runId, location, window_start,
window_end " +    "FROM TABLE(" +    "*

*TUMBLE(" +    "        TABLE " + fileSystemFilesTable + "," +     "
DESCRIPTOR(ts), " +*

*    "*        *INTERVAL '1' SECONDS))");*



*Table tableFiles = tEnv.sqlQuery(*

*    "SELECT runId, location, window_start, window_end " +*

*    "FROM TABLE(" +*

*    "*    *TUMBLE(" +*

*    "        TABLE " + **tableFilesTable** + "," +*

*    "        DESCRIPTOR(ts), " +**    "*        *INTERVAL '1' SECONDS))");*


Then I print out these streams with the following code, I see the values in
the logs:

*tEnv.toDataStream(fsFiles).print("FS");*
*tEnv.toDataStream(tableFiles).print("TS");*


The result is:

*FS:2> +I[1705405510802,
file:/var/folders/19/xs17kb0j7dj0klq324_vj7sc0000gn/T/junit13711198986865553391/db.db/test_table_with_pk/metadata/00000-b717c629-bb71-48df-a30b-615aeb320aec.metadata.json,
2024-01-16T11:45:10, 2024-01-16T11:45:11]*
*[..]*
*TS:2> +I[1705405510802,
file:/var/folders/19/xs17kb0j7dj0klq324_vj7sc0000gn/T/junit13711198986865553391/db.db/test_table_unpartitioned/metadata/snap-532818363465442978-1-dc47e70d-82eb-490a-a21d-c032b88c3303.avro,
2024-01-16T11:45:10, 2024-01-16T11:45:11]*
*[..]*


So this is as I expected the 2 streams periodically emit the incoming data
files with the runIds, timestamps.

Now, I try to run an ANTI JOIN on these streams:

*Table missingFiles = tEnv.sqlQuery(*
*    "           SELECT ts, location\n" +*
*    "           FROM (\n" +*
*    "               SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
*    "           ) L WHERE L.location NOT IN (\n" +*
*    "             SELECT location FROM (   \n" +*
*    "               SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles + ",
DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
*    "             ) R WHERE L.window_start = R.window_start AND
L.window_end = R.window_end)");*


And event though there is some missing files based on the logs, I do not
see any records in the logs for the missing table:

*tEnv.toDataStream(missingFiles).print("MISSING");*


Just for trying out a different way of solving/checking this, I tried to
have a FULL JOIN to see how the join works:

*Table joined = tEnv.sqlQuery(*
*        "    SELECT fs_files.location AS fs_location, table_files.location
AS table_location,\n" +*
*        "      COALESCE(fs_files.window_start, table_files.window_start)
as window_start,\n" +*
*        "      COALESCE(fs_files.window_end, table_files.window_end) as
window_end\n" +*
*        "      FROM (SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
DESCRIPTOR(ts), INTERVAL '1' SECONDS))) fs_files\n" +*
*        "      LEFT JOIN (SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles
+ ", DESCRIPTOR(ts), INTERVAL '1' SECONDS))) table_files\n" +*
*        "      ON fs_files.location = table_files.location AND\n" +*
*        "         fs_files.window_start = table_files.window_start AND \n"
+*
*        "         fs_files.window_end = table_files.window_end\n");*


And there is nothing in the logs for the join either.

I think I might miss something around the windowing, and my joined windows
are not triggered with the complex queries, but I am stuck at the moment,
so any help would be appreciated.

Thanks,
Peter

Reply via email to