wuchong commented on code in PR #2178:
URL: https://github.com/apache/fluss/pull/2178#discussion_r2642532582


##########
fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java:
##########
@@ -191,7 +191,8 @@ private KvTablet createKvTablet(
                 rowMerger,
                 DEFAULT_COMPRESSION,
                 schemaGetter,
-                tableConf.getChangelogImage());
+                tableConf.getChangelogImage(),
+                null);

Review Comment:
   ditto



##########
website/docs/engine-flink/ddl.md:
##########
@@ -406,3 +406,252 @@ WITH (
 - Only continuous refresh mode is supported
 - Schema is automatically derived from the query
 - Materialized tables are stored as regular Fluss tables with special metadata
+
+## Procedures

Review Comment:
   What I mean is to place the "Procedures" documentation as a separate page 
under the "DDL" page. Additionally, each procedure method should be formatted 
as an H3 or H2 header, so it appears in the TOC (table of contents) and can be 
directly linked via URL.



##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -115,68 +148,296 @@ boolean isAllowedConfig(String key) {
 
     private void updateCurrentConfig(Map<String, String> newDynamicConfigs, 
boolean skipErrorConfig)
             throws Exception {
-        Map<String, String> newProps = new HashMap<>(initialConfigMap);
-        overrideProps(newProps, newDynamicConfigs);
-        Configuration newConfig = Configuration.fromMap(newProps);
-        Configuration oldConfig = currentConfig;
-        Set<ServerReconfigurable> appliedServerReconfigurableSet = new 
HashSet<>();
-        if (!newProps.equals(currentConfigMap)) {
-            serverReconfigures
-                    .values()
-                    .forEach(
-                            serverReconfigurable -> {
-                                try {
-                                    serverReconfigurable.validate(newConfig);
-                                } catch (ConfigException e) {
-                                    LOG.error(
-                                            "Validate new dynamic config error 
and will roll back all the applied config.",
-                                            e);
-                                    if (!skipErrorConfig) {
-                                        throw e;
-                                    }
-                                }
-                            });
-
-            Exception throwable = null;
-            for (ServerReconfigurable serverReconfigurable : 
serverReconfigures.values()) {
-                try {
-                    serverReconfigurable.reconfigure(newConfig);
-                    appliedServerReconfigurableSet.add(serverReconfigurable);
-                } catch (ConfigException e) {
-                    LOG.error(
-                            "Apply new dynamic error and will roll back all 
the applied config.",
-                            e);
-                    if (!skipErrorConfig) {
-                        throwable = e;
-                        break;
-                    }
-                }
+        // Compute effective config changes (merge with initial configs)
+        Map<String, String> effectiveChanges =
+                computeEffectiveChanges(newDynamicConfigs, skipErrorConfig);
+
+        // Early return if no effective changes
+        if (effectiveChanges.isEmpty()) {
+            return;
+        }
+
+        // Build new configuration by merging initial + dynamic configs
+        Map<String, String> newConfigMap = buildConfigMap(effectiveChanges);
+        Configuration newConfig = Configuration.fromMap(newConfigMap);
+
+        // Apply changes to all registered ServerReconfigurable instances
+        applyToServerReconfigurables(newConfig, skipErrorConfig);
+
+        // Update internal state
+        updateInternalState(newConfig, newConfigMap, newDynamicConfigs);
+        LOG.info("Dynamic configs changed: {}", newDynamicConfigs);

Review Comment:
   I think logging the the chagned configs `effectiveChanges` is more helpful 
than the full set `newDynamicConfigs`.



##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -115,68 +148,296 @@ boolean isAllowedConfig(String key) {
 
     private void updateCurrentConfig(Map<String, String> newDynamicConfigs, 
boolean skipErrorConfig)
             throws Exception {
-        Map<String, String> newProps = new HashMap<>(initialConfigMap);
-        overrideProps(newProps, newDynamicConfigs);
-        Configuration newConfig = Configuration.fromMap(newProps);
-        Configuration oldConfig = currentConfig;
-        Set<ServerReconfigurable> appliedServerReconfigurableSet = new 
HashSet<>();
-        if (!newProps.equals(currentConfigMap)) {
-            serverReconfigures
-                    .values()
-                    .forEach(
-                            serverReconfigurable -> {
-                                try {
-                                    serverReconfigurable.validate(newConfig);
-                                } catch (ConfigException e) {
-                                    LOG.error(
-                                            "Validate new dynamic config error 
and will roll back all the applied config.",
-                                            e);
-                                    if (!skipErrorConfig) {
-                                        throw e;
-                                    }
-                                }
-                            });
-
-            Exception throwable = null;
-            for (ServerReconfigurable serverReconfigurable : 
serverReconfigures.values()) {
-                try {
-                    serverReconfigurable.reconfigure(newConfig);
-                    appliedServerReconfigurableSet.add(serverReconfigurable);
-                } catch (ConfigException e) {
-                    LOG.error(
-                            "Apply new dynamic error and will roll back all 
the applied config.",
-                            e);
-                    if (!skipErrorConfig) {
-                        throwable = e;
-                        break;
-                    }
-                }
+        // Compute effective config changes (merge with initial configs)
+        Map<String, String> effectiveChanges =
+                computeEffectiveChanges(newDynamicConfigs, skipErrorConfig);
+
+        // Early return if no effective changes
+        if (effectiveChanges.isEmpty()) {
+            return;

Review Comment:
   It would be helpful to log an info message when no changes are detected. 
This aids debugging, for example, when an `ALTER CONFIG` command appears to 
have no effect.



##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -115,68 +148,296 @@ boolean isAllowedConfig(String key) {
 
     private void updateCurrentConfig(Map<String, String> newDynamicConfigs, 
boolean skipErrorConfig)
             throws Exception {
-        Map<String, String> newProps = new HashMap<>(initialConfigMap);
-        overrideProps(newProps, newDynamicConfigs);
-        Configuration newConfig = Configuration.fromMap(newProps);
-        Configuration oldConfig = currentConfig;
-        Set<ServerReconfigurable> appliedServerReconfigurableSet = new 
HashSet<>();
-        if (!newProps.equals(currentConfigMap)) {
-            serverReconfigures
-                    .values()
-                    .forEach(
-                            serverReconfigurable -> {
-                                try {
-                                    serverReconfigurable.validate(newConfig);
-                                } catch (ConfigException e) {
-                                    LOG.error(
-                                            "Validate new dynamic config error 
and will roll back all the applied config.",
-                                            e);
-                                    if (!skipErrorConfig) {
-                                        throw e;
-                                    }
-                                }
-                            });
-
-            Exception throwable = null;
-            for (ServerReconfigurable serverReconfigurable : 
serverReconfigures.values()) {
-                try {
-                    serverReconfigurable.reconfigure(newConfig);
-                    appliedServerReconfigurableSet.add(serverReconfigurable);
-                } catch (ConfigException e) {
-                    LOG.error(
-                            "Apply new dynamic error and will roll back all 
the applied config.",
-                            e);
-                    if (!skipErrorConfig) {
-                        throwable = e;
-                        break;
-                    }
-                }
+        // Compute effective config changes (merge with initial configs)
+        Map<String, String> effectiveChanges =
+                computeEffectiveChanges(newDynamicConfigs, skipErrorConfig);
+
+        // Early return if no effective changes
+        if (effectiveChanges.isEmpty()) {
+            return;
+        }
+
+        // Build new configuration by merging initial + dynamic configs
+        Map<String, String> newConfigMap = buildConfigMap(effectiveChanges);
+        Configuration newConfig = Configuration.fromMap(newConfigMap);
+
+        // Apply changes to all registered ServerReconfigurable instances
+        applyToServerReconfigurables(newConfig, skipErrorConfig);
+
+        // Update internal state
+        updateInternalState(newConfig, newConfigMap, newDynamicConfigs);
+        LOG.info("Dynamic configs changed: {}", newDynamicConfigs);
+    }
+
+    /**
+     * Computes effective config changes by validating new configs and 
handling deletions.
+     *
+     * @param newDynamicConfigs new dynamic configs from ZooKeeper
+     * @param skipErrorConfig whether to skip invalid configs
+     * @return map of config changes that passed validation
+     * @throws ConfigException if validation fails and skipErrorConfig is false
+     */
+    private Map<String, String> computeEffectiveChanges(
+            Map<String, String> newDynamicConfigs, boolean skipErrorConfig) 
throws ConfigException {
+        Map<String, String> effectiveChanges = new HashMap<>();
+        Set<String> skippedConfigs = new HashSet<>();
+
+        // Process deleted configs: restore to initial value or remove
+        processDeletions(newDynamicConfigs, effectiveChanges, skippedConfigs, 
skipErrorConfig);
+
+        // Process added/modified configs
+        processModifications(newDynamicConfigs, effectiveChanges, 
skippedConfigs, skipErrorConfig);
+
+        if (!skippedConfigs.isEmpty()) {
+            LOG.warn("Skipped invalid configs: {}", skippedConfigs);
+        }
+
+        return effectiveChanges;
+    }
+
+    /** Processes config deletions by restoring to initial values or removing 
them. */
+    private void processDeletions(
+            Map<String, String> newDynamicConfigs,
+            Map<String, String> effectiveChanges,
+            Set<String> skippedConfigs,
+            boolean skipErrorConfig)
+            throws ConfigException {
+        for (String configKey : dynamicConfigs.keySet()) {
+            if (newDynamicConfigs.containsKey(configKey)) {
+                continue; // Not deleted
+            }
+
+            String currentValue = currentConfigMap.get(configKey);
+            String initialValue = initialConfigMap.get(configKey);
+
+            // Determine target value: initial value or null (removal)
+            String targetValue = initialValue;
+
+            // Skip if no change needed (already at initial value)
+            if (Objects.equals(currentValue, targetValue)) {
+                continue;
+            }
+
+            // Validate the change
+            if (validateConfigChange(
+                    configKey, currentValue, targetValue, skippedConfigs, 
skipErrorConfig)) {
+                effectiveChanges.put(configKey, targetValue);
             }
+        }
+    }
+
+    /** Processes config additions and modifications. */
+    private void processModifications(
+            Map<String, String> newDynamicConfigs,
+            Map<String, String> effectiveChanges,
+            Set<String> skippedConfigs,
+            boolean skipErrorConfig)
+            throws ConfigException {
+        for (Map.Entry<String, String> entry : newDynamicConfigs.entrySet()) {
+            String configKey = entry.getKey();
+            String newValue = entry.getValue();
+            String currentValue = currentConfigMap.get(configKey);
 
-            // rollback to old config if there is an error.
-            if (throwable != null) {
-                appliedServerReconfigurableSet.forEach(
-                        serverReconfigurable -> 
serverReconfigurable.reconfigure(oldConfig));
-                throw throwable;
+            // Skip if value unchanged
+            if (Objects.equals(currentValue, newValue)) {
+                continue;
             }
 
-            currentConfig = newConfig;
-            currentConfigMap.clear();
-            dynamicConfigs.clear();
-            currentConfigMap.putAll(newProps);
-            dynamicConfigs.putAll(newDynamicConfigs);
-            LOG.info("Dynamic configs changed: {}", newDynamicConfigs);
+            // Validate and add to effective changes
+            if (validateConfigChange(
+                    configKey, currentValue, newValue, skippedConfigs, 
skipErrorConfig)) {
+                effectiveChanges.put(configKey, newValue);
+            }
         }
     }
 
-    private void overrideProps(Map<String, String> props, Map<String, String> 
propsOverride) {
-        propsOverride.forEach(
+    /**
+     * Validates a single config change.
+     *
+     * @return true if validation passed, false if skipped due to error
+     * @throws ConfigException if validation fails and skipErrorConfig is false
+     */
+    private boolean validateConfigChange(
+            String configKey,
+            String oldValue,
+            String newValue,
+            Set<String> skippedConfigs,
+            boolean skipErrorConfig)
+            throws ConfigException {
+        try {
+            validateSingleConfig(configKey, oldValue, newValue);
+            return true;
+        } catch (ConfigException e) {
+            LOG.error(
+                    "Config validation failed for '{}': {} -> {}. {}",
+                    configKey,
+                    oldValue,
+                    newValue,
+                    e.getMessage());
+            if (skipErrorConfig) {
+                skippedConfigs.add(configKey);
+                return false;
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    /** Builds final config map by merging initial configs with effective 
changes. */
+    private Map<String, String> buildConfigMap(Map<String, String> 
effectiveChanges) {
+        Map<String, String> configMap = new HashMap<>(initialConfigMap);
+        effectiveChanges.forEach(
                 (key, value) -> {
                     if (value == null) {
-                        props.remove(key);
+                        configMap.remove(key);
                     } else {
-                        props.put(key, value);
+                        configMap.put(key, value);
                     }
                 });
+        return configMap;
+    }
+
+    /** Updates internal state after successful reconfiguration. */
+    private void updateInternalState(
+            Configuration newConfig,
+            Map<String, String> newConfigMap,
+            Map<String, String> newDynamicConfigs) {
+        currentConfig = newConfig;
+        currentConfigMap.clear();
+        currentConfigMap.putAll(newConfigMap);
+        dynamicConfigs.clear();
+        dynamicConfigs.putAll(newDynamicConfigs);
+    }
+
+    /**
+     * Validates a single config entry including type parsing and business 
validation.
+     *
+     * @param configKey config key
+     * @param oldValueStr old value string
+     * @param newValueStr new value string
+     * @throws ConfigException if validation fails
+     */
+    private void validateSingleConfig(String configKey, String oldValueStr, 
String newValueStr)
+            throws ConfigException {
+        // Get ConfigOption for type information
+        ConfigOption<?> configOption = 
ConfigOptions.getConfigOption(configKey);
+
+        // For configs with allowed prefixes (like "datalake."), skip 
ConfigOption validation
+        // and rely on ServerReconfigurable's business validation
+        boolean hasPrefixConfig = false;
+        for (String prefix : ALLOWED_CONFIG_PREFIXES) {
+            if (configKey.startsWith(prefix)) {
+                hasPrefixConfig = true;
+                break;
+            }
+        }
+
+        if (configOption == null && !hasPrefixConfig) {
+            throw new ConfigException(
+                    String.format("No ConfigOption found for config key: %s", 
configKey));
+        }
+
+        // Parse and validate type only if ConfigOption exists
+        Object newValue = null;
+        if (configOption != null && newValueStr != null) {
+            Configuration tempConfig = new Configuration();
+            tempConfig.setString(configKey, newValueStr);
+            try {
+                newValue = tempConfig.getOptional(configOption).get();
+            } catch (Exception e) {
+                throw new ConfigException(
+                        String.format(
+                                "Cannot parse '%s' as %s for config '%s'",
+                                newValueStr,
+                                configOption.isList()
+                                        ? "List<" + 
configOption.getClazz().getSimpleName() + ">"
+                                        : 
configOption.getClazz().getSimpleName(),
+                                configKey),
+                        e);

Review Comment:
   Add the exception message into `ConfigException` top error message, because 
`ApiException` will truncate the exception stack and will hidden the root 
cause. 



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java:
##########
@@ -73,29 +74,41 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
 
     private final boolean enableStatistics;
 
+    /** The shared rate limiter for all RocksDB instances. */
+    private final RateLimiter sharedRateLimiter;
+
     /** The handles to be closed when the container is closed. */
     private final ArrayList<AutoCloseable> handlesToClose;
 
     @VisibleForTesting
     RocksDBResourceContainer() {
-        this(new Configuration(), null, false);
+        this(new Configuration(), null, false, null);
     }
 
     public RocksDBResourceContainer(ReadableConfig configuration, @Nullable 
File instanceBasePath) {
-        this(configuration, instanceBasePath, false);
+        this(configuration, instanceBasePath, false, null);
     }
 
     public RocksDBResourceContainer(
             ReadableConfig configuration,
             @Nullable File instanceBasePath,
             boolean enableStatistics) {
+        this(configuration, instanceBasePath, enableStatistics, null);

Review Comment:
   **Nit:** It would be better to use a default `RateLimiter` instance instead 
of `null`, and add a `checkNotNull` guard for the `sharedRateLimiter` parameter 
in the constructor.  
   
   Currently, many places still pass `null` for `sharedRateLimiter`. While this 
is currently used test-only, it risks leaking into production code and could 
introduce subtle bugs in the future. Using a default rate limiter would make 
the API safer and more robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to