[ 
https://issues.apache.org/jira/browse/BEAM-7513?focusedWorklogId=260634&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-260634
 ]

ASF GitHub Bot logged work on BEAM-7513:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jun/19 18:38
            Start Date: 14/Jun/19 18:38
    Worklog Time Spent: 10m 
      Work Description: akedin commented on pull request #8822: [BEAM-7513] 
Adding Row Count for Bigquery Table
URL: https://github.com/apache/beam/pull/8822#discussion_r293928787
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
 ##########
 @@ -141,17 +144,34 @@ public Connection connect(String url, Properties info) 
throws SQLException {
    * not this path. The CLI ends up using the schema factory that populates 
the default schema with
    * all table providers it can find. See {@link BeamCalciteSchemaFactory}.
    */
-  public static JdbcConnection connect(TableProvider tableProvider) {
+  public static JdbcConnection connect(TableProvider tableProvider, 
PipelineOptions options) {
     try {
       Properties properties = new Properties();
       properties.setProperty(
           SCHEMA_FACTORY.camelName(), 
BeamCalciteSchemaFactory.Empty.class.getName());
       JdbcConnection connection =
           (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
       connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);
+      connection.setPipelineOptionsMap(getOptionsMap(options));
       return connection;
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
+
+  /** Converts {@link PipelineOptions} to its map format. */
+  private static Map<String, String> getOptionsMap(PipelineOptions options) {
+    ObjectMapper objectMapper = new ObjectMapper();
+    Map map = objectMapper.convertValue(options, Map.class);
+    map = (Map) map.get("options");
+    if (map == null) {
+      map = new HashMap();
+    }
+    Map<String, String> map2 = new HashMap<>();
+    for (Object entry : map.entrySet()) {
+      Map.Entry ent = (Map.Entry) entry;
+      map2.put(ent.getKey().toString(), ent.getValue().toString());
 
 Review comment:
   `getValue().toString()` is probably wrong. I think if you have a complex 
object there, it would be converted to a `Map<>` by the `ObjectMapper` and then 
`toString()` would return something like `{key=value}` which is not a valid 
JSON, so we won't be able to parse it back. What has to be done is we should 
use the same mapper to convert the value to JSON string instead, probably 
something along these lines:
   
   ```java
   optionsMap = map.entrySet()
    .stream()
    .collect(toMap(MapEntry::getKey, e-> 
OBJECT_MAPPER.writeValueAsString(e.getValue())))
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 260634)
    Time Spent: 1h 20m  (was: 1h 10m)

> Row Estimation for BigQueryTable
> --------------------------------
>
>                 Key: BEAM-7513
>                 URL: https://issues.apache.org/jira/browse/BEAM-7513
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql, io-java-gcp
>            Reporter: Alireza Samadianzakaria
>            Assignee: Alireza Samadianzakaria
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Calcite tables (org.apache.calcite.schema.Table) should implement the method 
> org.apache.calcite.schema.Statistic getStatistic(). The Statistic instance 
> returned by this method is used for the Volcano optimizer in Calcite. 
> Currently, org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable has not 
> implemented getStatistic() which means it uses the implementation in 
> org.apache.calcite.schema.impl.AbstractTable and that implementation just 
> returns Statistics.UNKNOWN for all sources.
>  
> Things needed to be implemented:
> 1- Implementing getStatistic in BeamCalciteTable such that it calls a row 
> count estimation method from BeamSqlTable and adding this method to 
> BeamSqlTable.
> 2- Implementing the row count estimation method for BigQueryTable. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to