ahmedabu98 commented on code in PR #34525:
URL: https://github.com/apache/beam/pull/34525#discussion_r2064855806


##########
sdks/python/apache_beam/transforms/managed_iceberg_it_test.py:
##########
@@ -72,7 +72,10 @@ def test_write_read_pipeline(self):
     with beam.Pipeline(argv=self.args) as read_pipeline:
       output_dicts = (
           read_pipeline
-          | beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config)
+          | beam.managed.Read(

Review Comment:
   Added another transform in this test



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -205,25 +202,51 @@ Row getConfigurationRow() {
   // May return an empty row (perhaps the underlying transform doesn't have 
any required
   // parameters)
   @VisibleForTesting
-  static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
+  static Row getRowConfig(ManagedConfig config, Schema transformConfigSchema) {
     Map<String, Object> configMap = config.resolveUnderlyingConfig();
     // Build a config Row that will be used to build the underlying 
SchemaTransform.
     // If a mapping for the SchemaTransform exists, we use it to update 
parameter names to align
     // with the underlying SchemaTransform config schema
-    Map<String, String> mapping = 
MAPPINGS.get(config.getTransformIdentifier());
-    if (mapping != null && configMap != null) {
+    Map<String, String> namingOverride = 
CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier());
+    if (namingOverride != null && configMap != null) {
       Map<String, Object> remappedConfig = new HashMap<>();
       for (Map.Entry<String, Object> entry : configMap.entrySet()) {
         String paramName = entry.getKey();
-        if (mapping.containsKey(paramName)) {
-          paramName = mapping.get(paramName);
+        if (namingOverride.containsKey(paramName)) {
+          paramName = namingOverride.get(paramName);
         }
         remappedConfig.put(paramName, entry.getValue());
       }
       configMap = remappedConfig;
     }
 
-    return YamlUtils.toBeamRow(configMap, transformSchema, false);
+    validateUserConfig(
+        config.getTransformIdentifier(), new HashSet<>(configMap.keySet()), 
transformConfigSchema);
+
+    return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
+  }
+
+  static void validateUserConfig(
+      String transformId, Set<String> userParams, Schema 
transformConfigSchema) {
+    List<String> missingRequiredFields = new ArrayList<>();
+    for (Schema.Field field : transformConfigSchema.getFields()) {
+      boolean inUserConfig = userParams.remove(field.getName());
+      if (!field.getType().getNullable() && !inUserConfig) {
+        missingRequiredFields.add(field.getName());
+      }
+    }
+
+    if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
+      String msg = "Invalid config for transform '" + transformId + "':";
+      if (!missingRequiredFields.isEmpty()) {
+        msg += " Missing required fields: " + missingRequiredFields + ".";
+      }
+      if (!userParams.isEmpty()) {
+        msg += " Contains unknown fields: " + userParams + ".";

Review Comment:
   done



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -205,25 +210,57 @@ Row getConfigurationRow() {
   // May return an empty row (perhaps the underlying transform doesn't have 
any required
   // parameters)
   @VisibleForTesting
-  static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
+  static Row getRowConfig(
+      ManagedConfig config, Schema transformConfigSchema, PipelineOptions 
options) {
     Map<String, Object> configMap = config.resolveUnderlyingConfig();
     // Build a config Row that will be used to build the underlying 
SchemaTransform.
     // If a mapping for the SchemaTransform exists, we use it to update 
parameter names to align
     // with the underlying SchemaTransform config schema
-    Map<String, String> mapping = 
MAPPINGS.get(config.getTransformIdentifier());
-    if (mapping != null && configMap != null) {
+    Map<String, String> namingOverride = 
CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier());
+    if (namingOverride != null && configMap != null) {
       Map<String, Object> remappedConfig = new HashMap<>();
       for (Map.Entry<String, Object> entry : configMap.entrySet()) {
         String paramName = entry.getKey();
-        if (mapping.containsKey(paramName)) {
-          paramName = mapping.get(paramName);
+        if (namingOverride.containsKey(paramName)) {
+          paramName = namingOverride.get(paramName);
         }
         remappedConfig.put(paramName, entry.getValue());
       }
       configMap = remappedConfig;
     }
 
-    return YamlUtils.toBeamRow(configMap, transformSchema, false);
+    @Nullable Boolean skipValidation = config.getSkipConfigValidation();
+    if (skipValidation == null || !skipValidation) {
+      validateUserConfig(
+          config.getTransformIdentifier(),
+          new HashSet<>(configMap.keySet()),
+          transformConfigSchema);
+    }
+
+    return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
+  }
+
+  static void validateUserConfig(
+      String transformId, Set<String> userParams, Schema 
transformConfigSchema) {
+    List<String> missingRequiredFields = new ArrayList<>();
+    for (Schema.Field field : transformConfigSchema.getFields()) {
+      boolean inUserConfig = userParams.remove(field.getName());
+      if (!field.getType().getNullable() && !inUserConfig) {
+        missingRequiredFields.add(field.getName());
+      }
+    }
+
+    if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
+      String msg = "Invalid config for transform '" + transformId + "':";
+      if (!missingRequiredFields.isEmpty()) {
+        msg += " Missing required fields: " + missingRequiredFields + ".";

Review Comment:
   Correct. This is just to make the error more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to