TheNeuralBit commented on a change in pull request #13588:
URL: https://github.com/apache/beam/pull/13588#discussion_r546971301



##########
File path: 
sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
##########
@@ -53,7 +53,7 @@
           .build();
 
   /** Convert DataCatalog schema to Beam schema. */
-  static Schema fromDataCatalog(com.google.cloud.datacatalog.v1beta1.Schema 
dcSchema) {
+  public static Schema 
fromDataCatalog(com.google.cloud.datacatalog.v1beta1.Schema dcSchema) {

Review comment:
       Rather than making these functions public so our internal code can use 
them, could we try to move the internal code into Beam? I think it should be 
possible to move it into DataCatalogTableProvider

##########
File path: 
sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
##########
@@ -98,4 +98,91 @@ private static FieldType getBeamFieldType(ColumnSchema 
column) {
     throw new UnsupportedOperationException(
         "Field type '" + dcFieldType + "' is not supported (field '" + 
column.getColumn() + "')");
   }
+
+  /** Convert Beam schema to DataCatalog schema. */
+  public static com.google.cloud.datacatalog.v1beta1.Schema 
toDataCatalog(Schema schema) {
+    com.google.cloud.datacatalog.v1beta1.Schema.Builder schemaBuilder =
+        com.google.cloud.datacatalog.v1beta1.Schema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      schemaBuilder.addColumns(fromBeamField(field));
+    }
+    return schemaBuilder.build();
+  }
+
+  private static ColumnSchema fromBeamField(Schema.Field field) {
+    Schema.FieldType fieldType = field.getType();
+    if (fieldType.getTypeName().equals(Schema.TypeName.ARRAY)) {
+      if (fieldType.getNullable()) {
+        throw new UnsupportedOperationException(
+            "Nullable array type is not supported in DataCatalog schemas: " + 
fieldType);
+      } else if (fieldType.getCollectionElementType().getNullable()) {
+        throw new UnsupportedOperationException(
+            "Nullable array element type is not supported in DataCatalog 
schemas: " + fieldType);
+      } else if 
(fieldType.getCollectionElementType().getTypeName().equals(Schema.TypeName.ARRAY))
 {
+        throw new UnsupportedOperationException(
+            "Array of arrays not supported in DataCatalog schemas: " + 
fieldType);
+      }
+      ColumnSchema column =
+          fromBeamField(Field.of(field.getName(), 
fieldType.getCollectionElementType()));
+      if (!column.getMode().isEmpty()) {
+        // We should have bailed out earlier for any cases that would result 
in mode being set.
+        throw new AssertionError(
+            "ColumnSchema for collection element type has non-empty mode: " + 
fieldType);
+      }
+      return column.toBuilder().setMode("REPEATED").build();
+    } else { // struct or primitive type
+      ColumnSchema.Builder colBuilder =
+          ColumnSchema.newBuilder().setType(getDataCatalogType(fieldType));
+
+      if (fieldType.getNullable()) {
+        colBuilder.setMode("NULLABLE");
+      }
+
+      // if this is a struct, add the child columns
+      if (fieldType.getTypeName().equals(Schema.TypeName.ROW)) {
+        for (Schema.Field subField : fieldType.getRowSchema().getFields()) {
+          colBuilder.addSubcolumns(fromBeamField(subField));
+        }
+      }
+
+      return colBuilder.setColumn(field.getName()).build();
+    }
+  }
+
+  private static String getDataCatalogType(FieldType fieldType) {
+    switch (fieldType.getTypeName()) {
+      case INT32:
+      case INT64:
+      case BYTES:
+      case DOUBLE:
+      case STRING:
+        return fieldType.getTypeName().name();
+      case BOOLEAN:
+        return "BOOL";
+      case DATETIME:
+        return "TIMESTAMP";
+      case DECIMAL:
+        return "NUMERIC";
+      case LOGICAL_TYPE:
+        Schema.LogicalType logical = fieldType.getLogicalType();
+        if (SqlTypes.TIME.getIdentifier().equals(logical.getIdentifier())) {
+          return "TIME";
+        } else if 
(SqlTypes.DATE.getIdentifier().equals(logical.getIdentifier())) {
+          return "DATE";
+        } else if 
(SqlTypes.DATETIME.getIdentifier().equals(logical.getIdentifier())) {
+          return "DATETIME";
+        } else {
+          throw new UnsupportedOperationException("Unsupported logical type: " 
+ logical);
+        }
+      case ROW:
+        return "STRUCT";
+      case MAP:
+        return String.format(
+            "MAP<%s,%s>",
+            getDataCatalogType(fieldType.getMapKeyType()),
+            getDataCatalogType(fieldType.getMapValueType()));
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + 
fieldType);
+    }
+  }

Review comment:
       Could you add unit tests that convert to/from data catalog types?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to