[ 
https://issues.apache.org/jira/browse/BEAM-14026?focusedWorklogId=736699&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-736699
 ]

ASF GitHub Bot logged work on BEAM-14026:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Mar/22 15:02
            Start Date: 04/Mar/22 15:02
    Worklog Time Spent: 10m 
      Work Description: abhijeet-lele commented on a change in pull request 
#16988:
URL: https://github.com/apache/beam/pull/16988#discussion_r819645977



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
##########
@@ -108,11 +109,32 @@
   /** Return the list of data values. */
   public abstract List<Object> getValues();
 
+  /** This is recursive call to get all the values of the nested rows.
+  The recusion is bounded by the amount of nesting with in the data
+   This mirrors the unnest behavior of calcite towards schema **/
+  public List<Object> getNestedRowBaseValues() {
+    return IntStream.range(0, getFieldCount())
+            .mapToObj(i -> {
+              List<Object> values = new ArrayList<>();
+              FieldType fieldType = this.getSchema().getField(i).getType();
+              if(fieldType.getTypeName().equals(TypeName.ROW)) {
+                Row row = this.getBaseValue(i, Row.class);
+                List<Object> rowValues = row.getNestedRowBaseValues();
+                if(null != rowValues) {
+                  values.addAll(rowValues);
+                }
+              } else {
+                values.add(this.getBaseValue(i));
+              }
+              return values.stream();
+            }).flatMap(Function.identity()).collect(Collectors.toList());
+  }

Review comment:
       Moved the logic to the BeamUnnestRel.java




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 736699)
    Time Spent: 0.5h  (was: 20m)

> Using unnest on a array with rows having nested rows throws 
> IllegalArgumentException
> ------------------------------------------------------------------------------------
>
>                 Key: BEAM-14026
>                 URL: https://issues.apache.org/jira/browse/BEAM-14026
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql, sdk-java-core
>    Affects Versions: 2.36.0
>            Reporter: Abhijeet
>            Priority: P2
>             Fix For: Not applicable
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Using unnest with example sql statement like
> "select t.a1, t.a2, t.a3, d.b1, d.b2, d.b4, d.b3.c1, d.b3.c2, d.b3.c3 from 
> test t cross join unnest(t.a4) d"
> on the following dataset throws Illegal argument exception
> Java code snippet
> ----
> // define the input row format level3
> Schema level3Type =
> Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();
> Row level3Row1 = Row.withSchema(level3Type).addValues(1, "row", 1.0).build();
> Row level3Row2 = Row.withSchema(level3Type).addValues(2, "row", 2.0).build();
> Row level3Row3 = Row.withSchema(level3Type).addValues(3, "row", 3.0).build();
> // define the input row format level3
> Schema level2Type =
> Schema.builder().addInt32Field("b1")
> .addStringField("b2")
> .addRowField("b3", level3Type)
> .addDoubleField("b4").build();
> Row level2Row1 = Row.withSchema(level2Type).addValues(1, "row", level3Row1, 
> 1.0).build();
> Row level2Row2 = Row.withSchema(level2Type).addValues(2, "row", level3Row2, 
> 2.0).build();
> Row level2Row3 = Row.withSchema(level2Type).addValues(3, "row", level3Row3, 
> 3.0).build();
> // define the input row format level3
> Schema level1Type =
> Schema.builder().addInt32Field("a1")
> .addStringField("a2")
> .addDoubleField("a3")
> .addArrayField("a4", Schema.FieldType.row(level2Type))
> .build();
> Row level1Row1 = Row.withSchema(level1Type).addValues(1, "row", 1.0,
> Arrays.asList(level2Row1, level2Row2, level2Row3)).build();
> Row level1Row2 = Row.withSchema(level1Type).addValues(2, "row", 2.0,
> Arrays.asList(level2Row1, level2Row2, level2Row3)).build();
> Row level1Row3 = Row.withSchema(level1Type).addValues(3, "row", 3.0,
> Arrays.asList(level2Row1, level2Row2, level2Row3)).build();
> // create a source PCollection with Create.of();
> PCollection<Row> inputTable =
> PBegin.in(p).apply(Create.of(level1Row1, level1Row2, 
> level1Row3).withRowSchema(level1Type));
> ----
> Throws following exception
> java.lang.IllegalArgumentException: Row expected 10 fields. initialized with 
> 8 fields



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to