ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1158922274


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +133,154 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema 
beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = 
ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = 
jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, 
propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+  })
+  private static org.everit.json.schema.Schema 
jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends 
org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);
+        break;
+      case DECIMAL:
+      case FLOAT:
+      case DOUBLE:
+        propertySchema = NumberSchema.builder();
+        break;
+      case STRING:
+        propertySchema = StringSchema.builder();
+        break;
+      case BOOLEAN:
+        propertySchema = BooleanSchema.builder();
+        break;
+      case ARRAY:
+      case ITERABLE:
+        propertySchema =
+            ArraySchema.builder()
+                
.allItemSchema(jsonPropertyFromBeamType(beamType.getCollectionElementType()));
+        break;
+      case ROW:
+        propertySchema = 
jsonSchemaBuilderFromBeamSchema(beamType.getRowSchema());
+        break;
+
+        // add more Beam to JSON types
+      default:
+        throw new IllegalArgumentException("Unsupported Beam to JSON type: " + 
beamType);
+    }
+
+    if (beamType.getNullable()) {
+      propertySchema = propertySchema.nullable(true);
+    }
+
+    return propertySchema.build();
+  }
+
   public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) {
     org.everit.json.schema.ObjectSchema jsonSchema = 
jsonSchemaFromString(jsonSchemaStr);
     return beamSchemaFromJsonSchema(jsonSchema);
   }
 
   private static Schema 
beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) {
     Schema.Builder beamSchemaBuilder = Schema.builder();
-    for (Map.Entry<String, org.everit.json.schema.Schema> entry :
-        jsonSchema.getPropertySchemas().entrySet()) {
+    Map<String, org.everit.json.schema.Schema> properties =
+        new HashMap<>(jsonSchema.getPropertySchemas());
+    // Properties in a JSON Schema are stored in a Map object and 
unfortunately don't maintain
+    // order. However, the schema's required properties is a list of property 
names that is
+    // consistent and is in the same order as when the schema was first 
created. To create a
+    // consistent Beam Schema from the same JSON schema, we add Schema Fields 
following this order.
+    // We can guarantee a consistent Beam schema when all JSON properties are 
required.
+    for (String propertyName : jsonSchema.getRequiredProperties()) {
+      org.everit.json.schema.Schema propertySchema = 
properties.get(propertyName);
+      if (propertySchema == null) {
+        throw new IllegalArgumentException("Unable to parse schema " + 
jsonSchema);
+      }
+
+      Boolean isNullable =
+          
Boolean.TRUE.equals(propertySchema.getUnprocessedProperties().get("nullable"));
+      beamSchemaBuilder =
+          addPropertySchemaToBeamSchema(
+              propertyName, propertySchema, beamSchemaBuilder, isNullable);
+      // Remove properties we already added.
+      properties.remove(propertyName, propertySchema);
+    }
+
+    // Now we are potentially left with properties that are not required. Add 
them too.
+    // Note: having more than one non-required properties may result in  
inconsistent
+    // Beam schema field orderings.

Review Comment:
   properties in a json schema do not maintain ordering. When we convert a Beam 
schema to a json schema, we add the fields to the required properties in order. 
So when converting from json schema back to Beam schema, the ordering is kept 
in required properties. As for the additional (non-required) properties, we 
won't be able to guarantee the Beam schema order. 
   
   Unlike json schemas, Beam schemas care about order. So one json schema with 
non-required properties can output more than one beam schema, and for Beam 
standards they would be equivalent but not equal. This makes a best effort 
attempt to make a given Beam schema survive a roundtrip of converting to json 
schema and back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to