ahmedabu98 commented on code in PR #34525: URL: https://github.com/apache/beam/pull/34525#discussion_r2064855806
########## sdks/python/apache_beam/transforms/managed_iceberg_it_test.py: ########## @@ -72,7 +72,10 @@ def test_write_read_pipeline(self): with beam.Pipeline(argv=self.args) as read_pipeline: output_dicts = ( read_pipeline - | beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config) + | beam.managed.Read( Review Comment: Added another transform in this test ########## sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java: ########## @@ -205,25 +202,51 @@ Row getConfigurationRow() { // May return an empty row (perhaps the underlying transform doesn't have any required // parameters) @VisibleForTesting - static Row getRowConfig(ManagedConfig config, Schema transformSchema) { + static Row getRowConfig(ManagedConfig config, Schema transformConfigSchema) { Map<String, Object> configMap = config.resolveUnderlyingConfig(); // Build a config Row that will be used to build the underlying SchemaTransform. // If a mapping for the SchemaTransform exists, we use it to update parameter names to align // with the underlying SchemaTransform config schema - Map<String, String> mapping = MAPPINGS.get(config.getTransformIdentifier()); - if (mapping != null && configMap != null) { + Map<String, String> namingOverride = CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier()); + if (namingOverride != null && configMap != null) { Map<String, Object> remappedConfig = new HashMap<>(); for (Map.Entry<String, Object> entry : configMap.entrySet()) { String paramName = entry.getKey(); - if (mapping.containsKey(paramName)) { - paramName = mapping.get(paramName); + if (namingOverride.containsKey(paramName)) { + paramName = namingOverride.get(paramName); } remappedConfig.put(paramName, entry.getValue()); } configMap = remappedConfig; } - return YamlUtils.toBeamRow(configMap, transformSchema, false); + validateUserConfig( + config.getTransformIdentifier(), new HashSet<>(configMap.keySet()), transformConfigSchema); + + return YamlUtils.toBeamRow(configMap, transformConfigSchema, false); + } + + static void validateUserConfig( + String transformId, Set<String> userParams, Schema transformConfigSchema) { + List<String> missingRequiredFields = new ArrayList<>(); + for (Schema.Field field : transformConfigSchema.getFields()) { + boolean inUserConfig = userParams.remove(field.getName()); + if (!field.getType().getNullable() && !inUserConfig) { + missingRequiredFields.add(field.getName()); + } + } + + if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) { + String msg = "Invalid config for transform '" + transformId + "':"; + if (!missingRequiredFields.isEmpty()) { + msg += " Missing required fields: " + missingRequiredFields + "."; + } + if (!userParams.isEmpty()) { + msg += " Contains unknown fields: " + userParams + "."; Review Comment: done ########## sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java: ########## @@ -205,25 +210,57 @@ Row getConfigurationRow() { // May return an empty row (perhaps the underlying transform doesn't have any required // parameters) @VisibleForTesting - static Row getRowConfig(ManagedConfig config, Schema transformSchema) { + static Row getRowConfig( + ManagedConfig config, Schema transformConfigSchema, PipelineOptions options) { Map<String, Object> configMap = config.resolveUnderlyingConfig(); // Build a config Row that will be used to build the underlying SchemaTransform. // If a mapping for the SchemaTransform exists, we use it to update parameter names to align // with the underlying SchemaTransform config schema - Map<String, String> mapping = MAPPINGS.get(config.getTransformIdentifier()); - if (mapping != null && configMap != null) { + Map<String, String> namingOverride = CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier()); + if (namingOverride != null && configMap != null) { Map<String, Object> remappedConfig = new HashMap<>(); for (Map.Entry<String, Object> entry : configMap.entrySet()) { String paramName = entry.getKey(); - if (mapping.containsKey(paramName)) { - paramName = mapping.get(paramName); + if (namingOverride.containsKey(paramName)) { + paramName = namingOverride.get(paramName); } remappedConfig.put(paramName, entry.getValue()); } configMap = remappedConfig; } - return YamlUtils.toBeamRow(configMap, transformSchema, false); + @Nullable Boolean skipValidation = config.getSkipConfigValidation(); + if (skipValidation == null || !skipValidation) { + validateUserConfig( + config.getTransformIdentifier(), + new HashSet<>(configMap.keySet()), + transformConfigSchema); + } + + return YamlUtils.toBeamRow(configMap, transformConfigSchema, false); + } + + static void validateUserConfig( + String transformId, Set<String> userParams, Schema transformConfigSchema) { + List<String> missingRequiredFields = new ArrayList<>(); + for (Schema.Field field : transformConfigSchema.getFields()) { + boolean inUserConfig = userParams.remove(field.getName()); + if (!field.getType().getNullable() && !inUserConfig) { + missingRequiredFields.add(field.getName()); + } + } + + if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) { + String msg = "Invalid config for transform '" + transformId + "':"; + if (!missingRequiredFields.isEmpty()) { + msg += " Missing required fields: " + missingRequiredFields + "."; Review Comment: Correct. This is just to make the error more readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org