[ 
https://issues.apache.org/jira/browse/BEAM-4044?focusedWorklogId=97993&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97993
 ]

ASF GitHub Bot logged work on BEAM-4044:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/May/18 18:16
            Start Date: 03/May/18 18:16
    Worklog Time Spent: 10m 
      Work Description: akedin commented on a change in pull request #5224: 
[BEAM-4044] [SQL] Add tables via TableStore in Schema, execute DDL in Calcite 
model
URL: https://github.com/apache/beam/pull/5224#discussion_r185888542
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
 ##########
 @@ -78,25 +76,18 @@
     }
   }
 
-  private PCollectionTuple toPCollectionTuple(PInput inputs) {
-    return (inputs instanceof PCollection)
-        ? PCollectionTuple.of(new TupleTag<>(PCOLLECTION_NAME), toRows(inputs))
-        : tupleOfAllInputs(inputs.getPipeline(), inputs.expand());
-  }
-
-  private PCollectionTuple tupleOfAllInputs(
-      Pipeline pipeline,
-      Map<TupleTag<?>, PValue> taggedInputs) {
-
-    PCollectionTuple tuple = PCollectionTuple.empty(pipeline);
-
-    for (Map.Entry<TupleTag<?>, PValue> input : taggedInputs.entrySet()) {
-      tuple = tuple.and(
-          new TupleTag<>(input.getKey().getId()),
-          toRows(input.getValue()));
+  private BeamSqlTableProvider toTableProvider(PInput inputs) {
+    ImmutableMap.Builder<String, BeamSqlTable> tables = ImmutableMap.builder();
+    if (inputs instanceof PCollection) {
+      tables.put(PCOLLECTION_NAME,
+          new BeamPCollectionTable(toRows(inputs)));
+    } else {
+      for (Map.Entry<TupleTag<?>, PValue> input : inputs.expand().entrySet()) {
+        tables.put(input.getKey().getId(),
+            new BeamPCollectionTable(toRows(input.getValue())));
+      }
 
 Review comment:
   nit: I would avoid stateful if/else with loops with generics, hurts 
readability. Might consider extracting something like this:
   
   ```java
   if (input instanceof PCollection) {
     return 
        ImmuableMap.of(
           PCOLLECTION_NAME, 
           new BeamPCollectionTable(toRows(inputs)))
   }
   
   return
       inputs
           .expand()
           .entrySet()
           .stream()
           .collect(
               toMap(
                   keyedPCollection -> keyedPCollection.getKey().getId(),
                   keyedPCollection -> keyedPCollection.getValue()))
   ```
   
   and then create BeamSqlTableProvider outside

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 97993)
    Time Spent: 10.5h  (was: 10h 20m)

> Take advantage of Calcite DDL
> -----------------------------
>
>                 Key: BEAM-4044
>                 URL: https://issues.apache.org/jira/browse/BEAM-4044
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Andrew Pilloud
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> In Calcite 1.15 support for abstract DDL moved into calcite core. We should 
> take advantage of that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to