Jacob Ferriero created BEAM-10544:
-------------------------------------

             Summary: Select Types not equal with nested schema
                 Key: BEAM-10544
                 URL: https://issues.apache.org/jira/browse/BEAM-10544
             Project: Beam
          Issue Type: Bug
          Components: dsl-sql, sdk-java-core
            Reporter: Jacob Ferriero


When using SQL transform to join a large nested schema to a  flat table getting 
an error about "Types not equal" from Select [1]

We are not able the test of our use of SqlTransform to pass with direct runner. 
All code is checked into CSR [2].

Things of note:
Calcite Query Planner

Query (the real business logic was much more complex but this is sufficient to 
reproduce issue in our test)
```sql
SELECT
    t1.DeviceName AS DeviceName,
    t1.LinkName AS LinkName,
    t1.HostName AS HostName,
    t1.MeasuredAt AS MeasuredAt,
    t2.b_dBm AS b_dBm
FROM
    RealtimeRows AS t1
  INNER JOIN
  --BigQuery Dimension Side Input
    TxPowerSideInput AS t2
  ON
    t1.DeviceName = t2.DeviceName
```

Tables created like so (though in real tive )
```java
    // This table has the same schema to the real incoming Pub/Sub messages
    // in the real world use case.
    PCollection<Row> realtimeTestData = pipeline
        .apply("Read 1Hz staging",
            BigQueryIO
                .readTableRowsWithSchema()
                .fromQuery(
                    "SELECT * FROM `taara-db.jake_views.staging_sample_float`")
                .usingStandardSql())
        .apply(Convert.toRows());

    PCollection<Row> txPowerCalcRows = pipeline
        .apply("Read Tx Power Calc Side Input",
            BigQueryIO
                .readTableRowsWithSchema()
                .fromQuery(
                    "SELECT * FROM 
`taara-db`.MANUFACTURING.tx_power_timeinvariant_calculations")
                .usingStandardSql())
        .apply(Convert.toRows());
```

Relevant java snippet
```java 
  PCollection<Row> out = tables
        .apply(
            "Join to dimension Data",
            SqlTransform
                .query(sql)
                .registerUdf("POW", Pow.class)
                .registerUdf("SQRT", Sqrt.class)
                .registerUdf("LOG10", Log10.class)
                .registerUdf("GREATEST", Greatest.class)
                .registerUdf("EXTRACT_OFFSET", ExtractArrayOffset.class)
                .registerUdf("PARSE_TIMESTAMP", ParseTimestamp.class)
                .registerUdf("UNIX_SECONDS", UnixSeconds.class)
        );
```

[1] 
https://github.com/apache/beam/blob/b564239081e9351c56fb0e7d263495b95dd3f8f3/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java#L203
[2] 
https://source.cloud.google.com/taara-db/pso-taara-realtime-margin/+/master:streaming-join/streaming-join/src/test/java/com/google/x/taara/dataflow/transforms/RxTxPowersCorrFERCombinedSqlTransformIT.java




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to