chamikaramj commented on code in PR #34525:
URL: https://github.com/apache/beam/pull/34525#discussion_r2064697514


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -205,25 +210,57 @@ Row getConfigurationRow() {
   // May return an empty row (perhaps the underlying transform doesn't have 
any required
   // parameters)
   @VisibleForTesting
-  static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
+  static Row getRowConfig(
+      ManagedConfig config, Schema transformConfigSchema, PipelineOptions 
options) {
     Map<String, Object> configMap = config.resolveUnderlyingConfig();
     // Build a config Row that will be used to build the underlying 
SchemaTransform.
     // If a mapping for the SchemaTransform exists, we use it to update 
parameter names to align
     // with the underlying SchemaTransform config schema
-    Map<String, String> mapping = 
MAPPINGS.get(config.getTransformIdentifier());
-    if (mapping != null && configMap != null) {
+    Map<String, String> namingOverride = 
CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier());
+    if (namingOverride != null && configMap != null) {
       Map<String, Object> remappedConfig = new HashMap<>();
       for (Map.Entry<String, Object> entry : configMap.entrySet()) {
         String paramName = entry.getKey();
-        if (mapping.containsKey(paramName)) {
-          paramName = mapping.get(paramName);
+        if (namingOverride.containsKey(paramName)) {
+          paramName = namingOverride.get(paramName);
         }
         remappedConfig.put(paramName, entry.getValue());
       }
       configMap = remappedConfig;
     }
 
-    return YamlUtils.toBeamRow(configMap, transformSchema, false);
+    @Nullable Boolean skipValidation = config.getSkipConfigValidation();
+    if (skipValidation == null || !skipValidation) {
+      validateUserConfig(
+          config.getTransformIdentifier(),
+          new HashSet<>(configMap.keySet()),
+          transformConfigSchema);
+    }
+
+    return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
+  }
+
+  static void validateUserConfig(
+      String transformId, Set<String> userParams, Schema 
transformConfigSchema) {
+    List<String> missingRequiredFields = new ArrayList<>();
+    for (Schema.Field field : transformConfigSchema.getFields()) {
+      boolean inUserConfig = userParams.remove(field.getName());
+      if (!field.getType().getNullable() && !inUserConfig) {
+        missingRequiredFields.add(field.getName());
+      }
+    }
+
+    if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
+      String msg = "Invalid config for transform '" + transformId + "':";
+      if (!missingRequiredFields.isEmpty()) {
+        msg += " Missing required fields: " + missingRequiredFields + ".";

Review Comment:
   For missing required fields, we should already fail when constructing a Beam 
Row, right ?



##########
sdks/python/apache_beam/transforms/managed_iceberg_it_test.py:
##########
@@ -72,7 +72,10 @@ def test_write_read_pipeline(self):
     with beam.Pipeline(argv=self.args) as read_pipeline:
       output_dicts = (
           read_pipeline
-          | beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config)
+          | beam.managed.Read(

Review Comment:
   Ditto.



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java:
##########
@@ -138,7 +138,7 @@ public void testReadUsingManagedTransform() throws 
Exception {
     Map<String, Object> configMap = new Yaml().load(yamlConfig);
     PCollection<Row> output =
         testPipeline
-            .apply(Managed.read(Managed.ICEBERG).withConfig(configMap))
+            
.apply(Managed.read(Managed.ICEBERG).withConfig(configMap).skipConfigValidation())

Review Comment:
   Also need a version of this test that uses config validation ?



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -205,25 +202,51 @@ Row getConfigurationRow() {
   // May return an empty row (perhaps the underlying transform doesn't have 
any required
   // parameters)
   @VisibleForTesting
-  static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
+  static Row getRowConfig(ManagedConfig config, Schema transformConfigSchema) {
     Map<String, Object> configMap = config.resolveUnderlyingConfig();
     // Build a config Row that will be used to build the underlying 
SchemaTransform.
     // If a mapping for the SchemaTransform exists, we use it to update 
parameter names to align
     // with the underlying SchemaTransform config schema
-    Map<String, String> mapping = 
MAPPINGS.get(config.getTransformIdentifier());
-    if (mapping != null && configMap != null) {
+    Map<String, String> namingOverride = 
CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier());
+    if (namingOverride != null && configMap != null) {
       Map<String, Object> remappedConfig = new HashMap<>();
       for (Map.Entry<String, Object> entry : configMap.entrySet()) {
         String paramName = entry.getKey();
-        if (mapping.containsKey(paramName)) {
-          paramName = mapping.get(paramName);
+        if (namingOverride.containsKey(paramName)) {
+          paramName = namingOverride.get(paramName);
         }
         remappedConfig.put(paramName, entry.getValue());
       }
       configMap = remappedConfig;
     }
 
-    return YamlUtils.toBeamRow(configMap, transformSchema, false);
+    validateUserConfig(
+        config.getTransformIdentifier(), new HashSet<>(configMap.keySet()), 
transformConfigSchema);
+
+    return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
+  }
+
+  static void validateUserConfig(
+      String transformId, Set<String> userParams, Schema 
transformConfigSchema) {
+    List<String> missingRequiredFields = new ArrayList<>();
+    for (Schema.Field field : transformConfigSchema.getFields()) {
+      boolean inUserConfig = userParams.remove(field.getName());
+      if (!field.getType().getNullable() && !inUserConfig) {
+        missingRequiredFields.add(field.getName());
+      }
+    }
+
+    if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
+      String msg = "Invalid config for transform '" + transformId + "':";
+      if (!missingRequiredFields.isEmpty()) {
+        msg += " Missing required fields: " + missingRequiredFields + ".";
+      }
+      if (!userParams.isEmpty()) {
+        msg += " Contains unknown fields: " + userParams + ".";

Review Comment:
   We should mention the "skipConfigValidation" option in the error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to