[ 
https://issues.apache.org/jira/browse/BEAM-6428?focusedWorklogId=186667&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186667
 ]

ASF GitHub Bot logged work on BEAM-6428:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jan/19 02:11
            Start Date: 18/Jan/19 02:11
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on pull request #7545: [BEAM-6428] 
Add a textual selection syntax for schema fields.
URL: https://github.com/apache/beam/pull/7545#discussion_r248907626
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
 ##########
 @@ -97,89 +103,191 @@ private Select(FieldAccessDescriptor 
fieldAccessDescriptor) {
    * <p>This allows for nested fields to be selected as well.
    */
   public static <T> Select<T> fieldAccess(FieldAccessDescriptor 
fieldAccessDescriptor) {
-    return new Select(fieldAccessDescriptor);
+    return new Select<>(fieldAccessDescriptor);
   }
 
-  // TODO: Support Xpath or JsonPath as a way of describing fields.
-
   @Override
   public PCollection<Row> expand(PCollection<T> input) {
     Schema inputSchema = input.getSchema();
     FieldAccessDescriptor resolved = 
fieldAccessDescriptor.resolve(inputSchema);
     Schema outputSchema = getOutputSchema(inputSchema, resolved);
 
-    PCollection<Row> selected =
-        input
-            .apply(
-                ParDo.of(
-                    new DoFn<T, Row>() {
-                      // TODO: This should be the same as resolved so that 
Beam knows which fields
-                      // are being accessed. Currently Beam only supports 
wildcard descriptors.
-                      // Once BEAM-4457 is fixed, fix this.
-                      @FieldAccess("filterFields")
-                      final FieldAccessDescriptor fieldAccessDescriptor =
-                          FieldAccessDescriptor.withAllFields();
-
-                      @ProcessElement
-                      public void process(
-                          @FieldAccess("filterFields") Row row, 
OutputReceiver<Row> r) {
-                        r.output(selectRow(row, resolved, inputSchema, 
outputSchema));
-                      }
-                    }))
-            .setRowSchema(outputSchema);
-
-    return selected;
+    return input
+        .apply(
+            ParDo.of(
+                new DoFn<T, Row>() {
+                  // TODO: This should be the same as resolved so that Beam 
knows which fields
+                  // are being accessed. Currently Beam only supports wildcard 
descriptors.
+                  // Once BEAM-4457 is fixed, fix this.
+                  @FieldAccess("selectFields")
+                  final FieldAccessDescriptor fieldAccessDescriptor =
+                      FieldAccessDescriptor.withAllFields();
+
+                  @ProcessElement
+                  public void process(@FieldAccess("selectFields") Row row, 
OutputReceiver<Row> r) {
+                    r.output(selectRow(row, resolved, inputSchema, 
outputSchema));
+                  }
+                }))
+        .setRowSchema(outputSchema);
   }
 
-  // Currently we don't flatten selected nested fields. We should consider 
whether to flatten them
-  // or leave them as is.
+  // Currently we don't flatten selected nested fields.
   static Schema getOutputSchema(Schema inputSchema, FieldAccessDescriptor 
fieldAccessDescriptor) {
-    if (fieldAccessDescriptor.allFields()) {
+    if (fieldAccessDescriptor.getAllFields()) {
       return inputSchema;
     }
     Schema.Builder builder = new Schema.Builder();
     for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
       builder.addField(inputSchema.getField(fieldId));
     }
 
-    for (Map.Entry<Integer, FieldAccessDescriptor> nested :
-        fieldAccessDescriptor.nestedFields().entrySet()) {
-      Field field = inputSchema.getField(nested.getKey());
-      FieldAccessDescriptor nestedDescriptor = nested.getValue();
-      FieldType nestedType =
-          FieldType.row(getOutputSchema(field.getType().getRowSchema(), 
nestedDescriptor));
-
-      nestedType = nestedType.withNullable(field.getType().getNullable());
-      builder.addField(field.getName(), nestedType);
+    for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
+        fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
+      FieldDescriptor fieldDescriptor = nested.getKey();
+      Field field = 
inputSchema.getField(Preconditions.checkNotNull(fieldDescriptor.getFieldId()));
+      FieldType outputType =
+          getOutputSchemaHelper(
+              field.getType(), nested.getValue(), 
fieldDescriptor.getQualifiers(), 0);
+      builder.addField(field.getName(), outputType);
     }
     return builder.build();
   }
 
+  private static FieldType getOutputSchemaHelper(
+      FieldType inputFieldType,
+      FieldAccessDescriptor fieldAccessDescriptor,
+      List<Qualifier> qualifiers,
+      int qualifierPosition) {
+    if (qualifierPosition >= qualifiers.size()) {
+      // We have walked through any containers, and are at a row type. Extract 
the subschema
+      // for the row, preserving nullable attributes.
+      
Preconditions.checkArgument(inputFieldType.getTypeName().isCompositeType());
+      return FieldType.row(getOutputSchema(inputFieldType.getRowSchema(), 
fieldAccessDescriptor))
+          .withNullable(inputFieldType.getNullable());
+    }
+
+    Qualifier qualifier = qualifiers.get(qualifierPosition);
+    switch (qualifier.getKind()) {
+      case LIST:
+        
Preconditions.checkArgument(qualifier.getList().equals(ListQualifier.ALL));
+        FieldType componentType =
+            
Preconditions.checkNotNull(inputFieldType.getCollectionElementType());
+        FieldType outputComponent =
+            getOutputSchemaHelper(
+                    componentType, fieldAccessDescriptor, qualifiers, 
qualifierPosition + 1)
+                .withNullable(componentType.getNullable());
+        return 
FieldType.array(outputComponent).withNullable(inputFieldType.getNullable());
+      case MAP:
+        
Preconditions.checkArgument(qualifier.getMap().equals(MapQualifier.ALL));
+        FieldType keyType = 
Preconditions.checkNotNull(inputFieldType.getMapKeyType());
+        FieldType valueType = 
Preconditions.checkNotNull(inputFieldType.getMapValueType());
+        FieldType outputValueType =
+            getOutputSchemaHelper(
+                    valueType, fieldAccessDescriptor, qualifiers, 
qualifierPosition + 1)
+                .withNullable(valueType.getNullable());
+        return FieldType.map(keyType, 
outputValueType).withNullable(inputFieldType.getNullable());
+      default:
+        throw new RuntimeException("unexpected");
+    }
+  }
+
   static Row selectRow(
       Row input,
       FieldAccessDescriptor fieldAccessDescriptor,
       Schema inputSchema,
       Schema outputSchema) {
-    if (fieldAccessDescriptor.allFields()) {
+    if (fieldAccessDescriptor.getAllFields()) {
       return input;
-    } else {
-      Row.Builder output = Row.withSchema(outputSchema);
-      for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
-        output.addValue(input.getValue(fieldId));
-      }
-      for (Map.Entry<Integer, FieldAccessDescriptor> nested :
-          fieldAccessDescriptor.nestedFields().entrySet()) {
-        String fieldName = inputSchema.nameOf(nested.getKey());
-        Schema nestedInputSchema = 
inputSchema.getField(nested.getKey()).getType().getRowSchema();
-        Schema nestedOutputSchema = 
outputSchema.getField(fieldName).getType().getRowSchema();
-        output.addValue(
-            selectRow(
-                input.getValue(fieldName),
-                nested.getValue(),
-                nestedInputSchema,
-                nestedOutputSchema));
-      }
-      return output.build();
+    }
+
+    Row.Builder output = Row.withSchema(outputSchema);
+    for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
+      // TODO: Once we support specific qualifiers (like array slices), 
extract them here.
+      output.addValue(input.getValue(fieldId));
+    }
+
+    for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
+        fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
+      FieldDescriptor field = nested.getKey();
+      String fieldName = 
inputSchema.nameOf(Preconditions.checkNotNull(field.getFieldId()));
+      FieldType nestedInputType = 
inputSchema.getField(field.getFieldId()).getType();
+      FieldType nestedOutputType = outputSchema.getField(fieldName).getType();
+      Object value =
+          selectRowHelper(
+              field.getQualifiers(),
+              0,
+              input.getValue(fieldName),
+              nested.getValue(),
+              nestedInputType,
+              nestedOutputType);
+      output.addValue(value);
+    }
+    return output.build();
+  }
+
+  @SuppressWarnings("unchecked")
 
 Review comment:
   It's due to the unchecked casts to List and Map below
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 186667)
    Time Spent: 1h  (was: 50m)

> Allow textual selection syntax for schema fields
> ------------------------------------------------
>
>                 Key: BEAM-6428
>                 URL: https://issues.apache.org/jira/browse/BEAM-6428
>             Project: Beam
>          Issue Type: Sub-task
>          Components: beam-model
>    Affects Versions: 2.9.0
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to