robinyqiu commented on a change in pull request #13588:
URL: https://github.com/apache/beam/pull/13588#discussion_r549882910



##########
File path: 
sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
##########
@@ -98,4 +98,91 @@ private static FieldType getBeamFieldType(ColumnSchema 
column) {
     throw new UnsupportedOperationException(
         "Field type '" + dcFieldType + "' is not supported (field '" + 
column.getColumn() + "')");
   }
+
+  /** Convert Beam schema to DataCatalog schema. */
+  public static com.google.cloud.datacatalog.v1beta1.Schema 
toDataCatalog(Schema schema) {
+    com.google.cloud.datacatalog.v1beta1.Schema.Builder schemaBuilder =
+        com.google.cloud.datacatalog.v1beta1.Schema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      schemaBuilder.addColumns(fromBeamField(field));
+    }
+    return schemaBuilder.build();
+  }
+
+  private static ColumnSchema fromBeamField(Schema.Field field) {
+    Schema.FieldType fieldType = field.getType();
+    if (fieldType.getTypeName().equals(Schema.TypeName.ARRAY)) {
+      if (fieldType.getNullable()) {
+        throw new UnsupportedOperationException(
+            "Nullable array type is not supported in DataCatalog schemas: " + 
fieldType);
+      } else if (fieldType.getCollectionElementType().getNullable()) {
+        throw new UnsupportedOperationException(
+            "Nullable array element type is not supported in DataCatalog 
schemas: " + fieldType);
+      } else if 
(fieldType.getCollectionElementType().getTypeName().equals(Schema.TypeName.ARRAY))
 {
+        throw new UnsupportedOperationException(
+            "Array of arrays not supported in DataCatalog schemas: " + 
fieldType);
+      }
+      ColumnSchema column =
+          fromBeamField(Field.of(field.getName(), 
fieldType.getCollectionElementType()));
+      if (!column.getMode().isEmpty()) {
+        // We should have bailed out earlier for any cases that would result 
in mode being set.
+        throw new AssertionError(
+            "ColumnSchema for collection element type has non-empty mode: " + 
fieldType);
+      }
+      return column.toBuilder().setMode("REPEATED").build();
+    } else { // struct or primitive type
+      ColumnSchema.Builder colBuilder =
+          ColumnSchema.newBuilder().setType(getDataCatalogType(fieldType));
+
+      if (fieldType.getNullable()) {
+        colBuilder.setMode("NULLABLE");
+      }
+
+      // if this is a struct, add the child columns
+      if (fieldType.getTypeName().equals(Schema.TypeName.ROW)) {
+        for (Schema.Field subField : fieldType.getRowSchema().getFields()) {
+          colBuilder.addSubcolumns(fromBeamField(subField));
+        }
+      }
+
+      return colBuilder.setColumn(field.getName()).build();
+    }
+  }
+
+  private static String getDataCatalogType(FieldType fieldType) {
+    switch (fieldType.getTypeName()) {
+      case INT32:
+      case INT64:
+      case BYTES:
+      case DOUBLE:
+      case STRING:
+        return fieldType.getTypeName().name();
+      case BOOLEAN:
+        return "BOOL";
+      case DATETIME:
+        return "TIMESTAMP";
+      case DECIMAL:
+        return "NUMERIC";
+      case LOGICAL_TYPE:
+        Schema.LogicalType logical = fieldType.getLogicalType();
+        if (SqlTypes.TIME.getIdentifier().equals(logical.getIdentifier())) {
+          return "TIME";
+        } else if 
(SqlTypes.DATE.getIdentifier().equals(logical.getIdentifier())) {
+          return "DATE";
+        } else if 
(SqlTypes.DATETIME.getIdentifier().equals(logical.getIdentifier())) {
+          return "DATETIME";
+        } else {
+          throw new UnsupportedOperationException("Unsupported logical type: " 
+ logical);
+        }
+      case ROW:
+        return "STRUCT";
+      case MAP:
+        return String.format(
+            "MAP<%s,%s>",
+            getDataCatalogType(fieldType.getMapKeyType()),
+            getDataCatalogType(fieldType.getMapValueType()));
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + 
fieldType);
+    }
+  }

Review comment:
       Tests added. While implementing the test I found a inconsistency in our 
implementation:
   
   1) In `fromDataCatalog` we treat a DC field without any mode set as 
`NULLABLE`
   2) In `toDataCatalog` we do not set any mode for a Beam field whose 
`getNullable` returns false, where I think we should set `REQUIRED` (fixed in 
the last commit, PTAL)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to