[ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=290893&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290893
 ]

ASF GitHub Bot logged work on BEAM-7886:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Aug/19 00:09
            Start Date: 08/Aug/19 00:09
    Worklog Time Spent: 10m 
      Work Description: robinyqiu commented on pull request #9188: [BEAM-7886] 
Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r311808396
 
 

 ##########
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
 ##########
 @@ -278,41 +290,85 @@ private static Object convertValue(Object value, 
CommonCoder coderSpec, Coder co
       return WindowedValue.of(windowValue, timestamp, windows, paneInfo);
     } else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
       return Double.parseDouble((String) value);
+    } else if (s.equals(getUrn(StandardCoders.Enum.ROW))) {
+      Schema schema;
+      try {
+        schema = 
SchemaTranslation.fromProto(SchemaApi.Schema.parseFrom(coderSpec.getPayload()));
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException("Failed to parse schema payload for row 
coder", e);
+      }
+
+      return parseField(value, Schema.FieldType.row(schema));
     } else {
       throw new IllegalStateException("Unknown coder URN: " + 
coderSpec.getUrn());
     }
   }
 
+  private static Object parseField(Object value, Schema.FieldType fieldType) {
+    switch (fieldType.getTypeName()) {
+      case BYTE:
+        return ((Number) value).byteValue();
+      case INT16:
+        return ((Number) value).shortValue();
+      case INT32:
+        return ((Number) value).intValue();
+      case INT64:
+        return ((Number) value).longValue();
+      case FLOAT:
+        return Float.parseFloat((String) value);
+      case DOUBLE:
+        return Double.parseDouble((String) value);
+      case STRING:
+        return (String) value;
+      case BOOLEAN:
+        return (Boolean) value;
+      case BYTES:
+        // extract String as byte[]
+        return ((String) value).getBytes(StandardCharsets.ISO_8859_1);
+      case ARRAY:
+        return ((List<Object>) value)
+            .stream()
+                .map((element) -> parseField(element, 
fieldType.getCollectionElementType()))
+                .collect(toImmutableList());
+      case MAP:
+        Map<Object, Object> kvMap = (Map<Object, Object>) value;
+        return kvMap.entrySet().stream()
+            .collect(
+                toImmutableMap(
+                    (pair) -> parseField(pair.getKey(), 
fieldType.getMapKeyType()),
+                    (pair) -> parseField(pair.getValue(), 
fieldType.getMapValueType())));
+      case ROW:
+        Map<String, Object> rowMap = (Map<String, Object>) value;
+        Schema schema = fieldType.getRowSchema();
+        Row.Builder row = Row.withSchema(schema);
+        for (Schema.Field field : schema.getFields()) {
+          Object element = rowMap.remove(field.getName());
+          if (element != null) {
+            element = parseField(element, field.getType());
+          }
+          row.addValue(element);
+        }
+
+        if (!rowMap.isEmpty()) {
+          throw new IllegalArgumentException(
+              "Value contains keys that are not in the schema: " + 
rowMap.keySet());
+        }
+
+        return row.build();
+      default: // DECIMAL, DATETIME, LOGICAL_TYPE
+        throw new IllegalArgumentException("Unsupported type name: " + 
fieldType.getTypeName());
+    }
+  }
+
   private static Coder<?> instantiateCoder(CommonCoder coder) {
     List<Coder<?>> components = new ArrayList<>();
     for (CommonCoder innerCoder : coder.getComponents()) {
       components.add(instantiateCoder(innerCoder));
     }
-    String s = coder.getUrn();
-    if (s.equals(getUrn(StandardCoders.Enum.BYTES))) {
-      return ByteArrayCoder.of();
-    } else if (s.equals(getUrn(StandardCoders.Enum.STRING_UTF8))) {
-      return StringUtf8Coder.of();
-    } else if (s.equals(getUrn(StandardCoders.Enum.KV))) {
-      return KvCoder.of(components.get(0), components.get(1));
-    } else if (s.equals(getUrn(StandardCoders.Enum.VARINT))) {
-      return VarLongCoder.of();
-    } else if (s.equals(getUrn(StandardCoders.Enum.INTERVAL_WINDOW))) {
-      return IntervalWindowCoder.of();
-    } else if (s.equals(getUrn(StandardCoders.Enum.ITERABLE))) {
-      return IterableCoder.of(components.get(0));
-    } else if (s.equals(getUrn(StandardCoders.Enum.TIMER))) {
-      return Timer.Coder.of(components.get(0));
-    } else if (s.equals(getUrn(StandardCoders.Enum.GLOBAL_WINDOW))) {
-      return GlobalWindow.Coder.INSTANCE;
-    } else if (s.equals(getUrn(StandardCoders.Enum.WINDOWED_VALUE))) {
-      return WindowedValue.FullWindowedValueCoder.of(
-          components.get(0), (Coder<BoundedWindow>) components.get(1));
-    } else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
-      return DoubleCoder.of();
-    } else {
-      throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
-    }
+    Class<? extends Coder> coderType =
 
 Review comment:
   Thanks for this simplification! Can we check if the returned value is null 
here, and throw a meaningful exception for the unknown urn?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 290893)
    Time Spent: 1h 20m  (was: 1h 10m)

> Make row coder a standard coder and implement in python
> -------------------------------------------------------
>
>                 Key: BEAM-7886
>                 URL: https://issues.apache.org/jira/browse/BEAM-7886
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model, sdk-java-core, sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to