Hi all, As you may know, we're working on BeamSQL to execute SQL queries as a Beam pipeline. This is a valuable feature, not only shipped as a packaged CLI, but also as part of the SDK to assemble a pipeline.
I prepare a document[1] to list the high level APIs, to show how SQL queries can be added in a pipeline. Below is a snippet of pseudocode for a quick reference: PipelineOptions options = PipelineOptionsFactory... Pipeline pipeline = Pipeline.create(options); //prepare environment of BeamSQL BeamSQLEnvironment sqlEnv = BeamSQLEnvironment.create(pipeline); //register table metadata sqlEnv.addTableMetadata(String tableName, BeamSqlTable tableMetadata); //register UDF sqlEnv.registerUDF(String functionName, Method udfMethod); //explain a SQL statement, SELECT only, and return as a PCollection; PCollection<BeamSQLRow> phase1Stream = sqlEnv.explainSQL(String sqlStatement); //A PCollection explained by BeamSQL can be converted into a table, and apply queries on it; sqlEnv.registerPCollectionAsTable(String tableName, phase1Stream); //apply more queries, even based on phase1Stream pipeline.run().waitUntilFinish(); Any feedback is very welcome! [1] https://docs.google.com/document/d/1uWXL_yF3UUO5GfCxbL6kWsmC8xCWfICU3RwiQKsk7Mk/edit?usp=sharing -- ---- Mingmin