wuchong commented on code in PR #2178:
URL: https://github.com/apache/fluss/pull/2178#discussion_r2642532582
##########
fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java:
##########
@@ -191,7 +191,8 @@ private KvTablet createKvTablet(
rowMerger,
DEFAULT_COMPRESSION,
schemaGetter,
- tableConf.getChangelogImage());
+ tableConf.getChangelogImage(),
+ null);
Review Comment:
ditto
##########
website/docs/engine-flink/ddl.md:
##########
@@ -406,3 +406,252 @@ WITH (
- Only continuous refresh mode is supported
- Schema is automatically derived from the query
- Materialized tables are stored as regular Fluss tables with special metadata
+
+## Procedures
Review Comment:
What I mean is to place the "Procedures" documentation as a separate page
under the "DDL" page. Additionally, each procedure method should be formatted
as an H3 or H2 header, so it appears in the TOC (table of contents) and can be
directly linked via URL.
##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -115,68 +148,296 @@ boolean isAllowedConfig(String key) {
private void updateCurrentConfig(Map<String, String> newDynamicConfigs,
boolean skipErrorConfig)
throws Exception {
- Map<String, String> newProps = new HashMap<>(initialConfigMap);
- overrideProps(newProps, newDynamicConfigs);
- Configuration newConfig = Configuration.fromMap(newProps);
- Configuration oldConfig = currentConfig;
- Set<ServerReconfigurable> appliedServerReconfigurableSet = new
HashSet<>();
- if (!newProps.equals(currentConfigMap)) {
- serverReconfigures
- .values()
- .forEach(
- serverReconfigurable -> {
- try {
- serverReconfigurable.validate(newConfig);
- } catch (ConfigException e) {
- LOG.error(
- "Validate new dynamic config error
and will roll back all the applied config.",
- e);
- if (!skipErrorConfig) {
- throw e;
- }
- }
- });
-
- Exception throwable = null;
- for (ServerReconfigurable serverReconfigurable :
serverReconfigures.values()) {
- try {
- serverReconfigurable.reconfigure(newConfig);
- appliedServerReconfigurableSet.add(serverReconfigurable);
- } catch (ConfigException e) {
- LOG.error(
- "Apply new dynamic error and will roll back all
the applied config.",
- e);
- if (!skipErrorConfig) {
- throwable = e;
- break;
- }
- }
+ // Compute effective config changes (merge with initial configs)
+ Map<String, String> effectiveChanges =
+ computeEffectiveChanges(newDynamicConfigs, skipErrorConfig);
+
+ // Early return if no effective changes
+ if (effectiveChanges.isEmpty()) {
+ return;
+ }
+
+ // Build new configuration by merging initial + dynamic configs
+ Map<String, String> newConfigMap = buildConfigMap(effectiveChanges);
+ Configuration newConfig = Configuration.fromMap(newConfigMap);
+
+ // Apply changes to all registered ServerReconfigurable instances
+ applyToServerReconfigurables(newConfig, skipErrorConfig);
+
+ // Update internal state
+ updateInternalState(newConfig, newConfigMap, newDynamicConfigs);
+ LOG.info("Dynamic configs changed: {}", newDynamicConfigs);
Review Comment:
I think logging the the chagned configs `effectiveChanges` is more helpful
than the full set `newDynamicConfigs`.
##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -115,68 +148,296 @@ boolean isAllowedConfig(String key) {
private void updateCurrentConfig(Map<String, String> newDynamicConfigs,
boolean skipErrorConfig)
throws Exception {
- Map<String, String> newProps = new HashMap<>(initialConfigMap);
- overrideProps(newProps, newDynamicConfigs);
- Configuration newConfig = Configuration.fromMap(newProps);
- Configuration oldConfig = currentConfig;
- Set<ServerReconfigurable> appliedServerReconfigurableSet = new
HashSet<>();
- if (!newProps.equals(currentConfigMap)) {
- serverReconfigures
- .values()
- .forEach(
- serverReconfigurable -> {
- try {
- serverReconfigurable.validate(newConfig);
- } catch (ConfigException e) {
- LOG.error(
- "Validate new dynamic config error
and will roll back all the applied config.",
- e);
- if (!skipErrorConfig) {
- throw e;
- }
- }
- });
-
- Exception throwable = null;
- for (ServerReconfigurable serverReconfigurable :
serverReconfigures.values()) {
- try {
- serverReconfigurable.reconfigure(newConfig);
- appliedServerReconfigurableSet.add(serverReconfigurable);
- } catch (ConfigException e) {
- LOG.error(
- "Apply new dynamic error and will roll back all
the applied config.",
- e);
- if (!skipErrorConfig) {
- throwable = e;
- break;
- }
- }
+ // Compute effective config changes (merge with initial configs)
+ Map<String, String> effectiveChanges =
+ computeEffectiveChanges(newDynamicConfigs, skipErrorConfig);
+
+ // Early return if no effective changes
+ if (effectiveChanges.isEmpty()) {
+ return;
Review Comment:
It would be helpful to log an info message when no changes are detected.
This aids debugging, for example, when an `ALTER CONFIG` command appears to
have no effect.
##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -115,68 +148,296 @@ boolean isAllowedConfig(String key) {
private void updateCurrentConfig(Map<String, String> newDynamicConfigs,
boolean skipErrorConfig)
throws Exception {
- Map<String, String> newProps = new HashMap<>(initialConfigMap);
- overrideProps(newProps, newDynamicConfigs);
- Configuration newConfig = Configuration.fromMap(newProps);
- Configuration oldConfig = currentConfig;
- Set<ServerReconfigurable> appliedServerReconfigurableSet = new
HashSet<>();
- if (!newProps.equals(currentConfigMap)) {
- serverReconfigures
- .values()
- .forEach(
- serverReconfigurable -> {
- try {
- serverReconfigurable.validate(newConfig);
- } catch (ConfigException e) {
- LOG.error(
- "Validate new dynamic config error
and will roll back all the applied config.",
- e);
- if (!skipErrorConfig) {
- throw e;
- }
- }
- });
-
- Exception throwable = null;
- for (ServerReconfigurable serverReconfigurable :
serverReconfigures.values()) {
- try {
- serverReconfigurable.reconfigure(newConfig);
- appliedServerReconfigurableSet.add(serverReconfigurable);
- } catch (ConfigException e) {
- LOG.error(
- "Apply new dynamic error and will roll back all
the applied config.",
- e);
- if (!skipErrorConfig) {
- throwable = e;
- break;
- }
- }
+ // Compute effective config changes (merge with initial configs)
+ Map<String, String> effectiveChanges =
+ computeEffectiveChanges(newDynamicConfigs, skipErrorConfig);
+
+ // Early return if no effective changes
+ if (effectiveChanges.isEmpty()) {
+ return;
+ }
+
+ // Build new configuration by merging initial + dynamic configs
+ Map<String, String> newConfigMap = buildConfigMap(effectiveChanges);
+ Configuration newConfig = Configuration.fromMap(newConfigMap);
+
+ // Apply changes to all registered ServerReconfigurable instances
+ applyToServerReconfigurables(newConfig, skipErrorConfig);
+
+ // Update internal state
+ updateInternalState(newConfig, newConfigMap, newDynamicConfigs);
+ LOG.info("Dynamic configs changed: {}", newDynamicConfigs);
+ }
+
+ /**
+ * Computes effective config changes by validating new configs and
handling deletions.
+ *
+ * @param newDynamicConfigs new dynamic configs from ZooKeeper
+ * @param skipErrorConfig whether to skip invalid configs
+ * @return map of config changes that passed validation
+ * @throws ConfigException if validation fails and skipErrorConfig is false
+ */
+ private Map<String, String> computeEffectiveChanges(
+ Map<String, String> newDynamicConfigs, boolean skipErrorConfig)
throws ConfigException {
+ Map<String, String> effectiveChanges = new HashMap<>();
+ Set<String> skippedConfigs = new HashSet<>();
+
+ // Process deleted configs: restore to initial value or remove
+ processDeletions(newDynamicConfigs, effectiveChanges, skippedConfigs,
skipErrorConfig);
+
+ // Process added/modified configs
+ processModifications(newDynamicConfigs, effectiveChanges,
skippedConfigs, skipErrorConfig);
+
+ if (!skippedConfigs.isEmpty()) {
+ LOG.warn("Skipped invalid configs: {}", skippedConfigs);
+ }
+
+ return effectiveChanges;
+ }
+
+ /** Processes config deletions by restoring to initial values or removing
them. */
+ private void processDeletions(
+ Map<String, String> newDynamicConfigs,
+ Map<String, String> effectiveChanges,
+ Set<String> skippedConfigs,
+ boolean skipErrorConfig)
+ throws ConfigException {
+ for (String configKey : dynamicConfigs.keySet()) {
+ if (newDynamicConfigs.containsKey(configKey)) {
+ continue; // Not deleted
+ }
+
+ String currentValue = currentConfigMap.get(configKey);
+ String initialValue = initialConfigMap.get(configKey);
+
+ // Determine target value: initial value or null (removal)
+ String targetValue = initialValue;
+
+ // Skip if no change needed (already at initial value)
+ if (Objects.equals(currentValue, targetValue)) {
+ continue;
+ }
+
+ // Validate the change
+ if (validateConfigChange(
+ configKey, currentValue, targetValue, skippedConfigs,
skipErrorConfig)) {
+ effectiveChanges.put(configKey, targetValue);
}
+ }
+ }
+
+ /** Processes config additions and modifications. */
+ private void processModifications(
+ Map<String, String> newDynamicConfigs,
+ Map<String, String> effectiveChanges,
+ Set<String> skippedConfigs,
+ boolean skipErrorConfig)
+ throws ConfigException {
+ for (Map.Entry<String, String> entry : newDynamicConfigs.entrySet()) {
+ String configKey = entry.getKey();
+ String newValue = entry.getValue();
+ String currentValue = currentConfigMap.get(configKey);
- // rollback to old config if there is an error.
- if (throwable != null) {
- appliedServerReconfigurableSet.forEach(
- serverReconfigurable ->
serverReconfigurable.reconfigure(oldConfig));
- throw throwable;
+ // Skip if value unchanged
+ if (Objects.equals(currentValue, newValue)) {
+ continue;
}
- currentConfig = newConfig;
- currentConfigMap.clear();
- dynamicConfigs.clear();
- currentConfigMap.putAll(newProps);
- dynamicConfigs.putAll(newDynamicConfigs);
- LOG.info("Dynamic configs changed: {}", newDynamicConfigs);
+ // Validate and add to effective changes
+ if (validateConfigChange(
+ configKey, currentValue, newValue, skippedConfigs,
skipErrorConfig)) {
+ effectiveChanges.put(configKey, newValue);
+ }
}
}
- private void overrideProps(Map<String, String> props, Map<String, String>
propsOverride) {
- propsOverride.forEach(
+ /**
+ * Validates a single config change.
+ *
+ * @return true if validation passed, false if skipped due to error
+ * @throws ConfigException if validation fails and skipErrorConfig is false
+ */
+ private boolean validateConfigChange(
+ String configKey,
+ String oldValue,
+ String newValue,
+ Set<String> skippedConfigs,
+ boolean skipErrorConfig)
+ throws ConfigException {
+ try {
+ validateSingleConfig(configKey, oldValue, newValue);
+ return true;
+ } catch (ConfigException e) {
+ LOG.error(
+ "Config validation failed for '{}': {} -> {}. {}",
+ configKey,
+ oldValue,
+ newValue,
+ e.getMessage());
+ if (skipErrorConfig) {
+ skippedConfigs.add(configKey);
+ return false;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /** Builds final config map by merging initial configs with effective
changes. */
+ private Map<String, String> buildConfigMap(Map<String, String>
effectiveChanges) {
+ Map<String, String> configMap = new HashMap<>(initialConfigMap);
+ effectiveChanges.forEach(
(key, value) -> {
if (value == null) {
- props.remove(key);
+ configMap.remove(key);
} else {
- props.put(key, value);
+ configMap.put(key, value);
}
});
+ return configMap;
+ }
+
+ /** Updates internal state after successful reconfiguration. */
+ private void updateInternalState(
+ Configuration newConfig,
+ Map<String, String> newConfigMap,
+ Map<String, String> newDynamicConfigs) {
+ currentConfig = newConfig;
+ currentConfigMap.clear();
+ currentConfigMap.putAll(newConfigMap);
+ dynamicConfigs.clear();
+ dynamicConfigs.putAll(newDynamicConfigs);
+ }
+
+ /**
+ * Validates a single config entry including type parsing and business
validation.
+ *
+ * @param configKey config key
+ * @param oldValueStr old value string
+ * @param newValueStr new value string
+ * @throws ConfigException if validation fails
+ */
+ private void validateSingleConfig(String configKey, String oldValueStr,
String newValueStr)
+ throws ConfigException {
+ // Get ConfigOption for type information
+ ConfigOption<?> configOption =
ConfigOptions.getConfigOption(configKey);
+
+ // For configs with allowed prefixes (like "datalake."), skip
ConfigOption validation
+ // and rely on ServerReconfigurable's business validation
+ boolean hasPrefixConfig = false;
+ for (String prefix : ALLOWED_CONFIG_PREFIXES) {
+ if (configKey.startsWith(prefix)) {
+ hasPrefixConfig = true;
+ break;
+ }
+ }
+
+ if (configOption == null && !hasPrefixConfig) {
+ throw new ConfigException(
+ String.format("No ConfigOption found for config key: %s",
configKey));
+ }
+
+ // Parse and validate type only if ConfigOption exists
+ Object newValue = null;
+ if (configOption != null && newValueStr != null) {
+ Configuration tempConfig = new Configuration();
+ tempConfig.setString(configKey, newValueStr);
+ try {
+ newValue = tempConfig.getOptional(configOption).get();
+ } catch (Exception e) {
+ throw new ConfigException(
+ String.format(
+ "Cannot parse '%s' as %s for config '%s'",
+ newValueStr,
+ configOption.isList()
+ ? "List<" +
configOption.getClazz().getSimpleName() + ">"
+ :
configOption.getClazz().getSimpleName(),
+ configKey),
+ e);
Review Comment:
Add the exception message into `ConfigException` top error message, because
`ApiException` will truncate the exception stack and will hidden the root
cause.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java:
##########
@@ -73,29 +74,41 @@ public class RocksDBResourceContainer implements
AutoCloseable {
private final boolean enableStatistics;
+ /** The shared rate limiter for all RocksDB instances. */
+ private final RateLimiter sharedRateLimiter;
+
/** The handles to be closed when the container is closed. */
private final ArrayList<AutoCloseable> handlesToClose;
@VisibleForTesting
RocksDBResourceContainer() {
- this(new Configuration(), null, false);
+ this(new Configuration(), null, false, null);
}
public RocksDBResourceContainer(ReadableConfig configuration, @Nullable
File instanceBasePath) {
- this(configuration, instanceBasePath, false);
+ this(configuration, instanceBasePath, false, null);
}
public RocksDBResourceContainer(
ReadableConfig configuration,
@Nullable File instanceBasePath,
boolean enableStatistics) {
+ this(configuration, instanceBasePath, enableStatistics, null);
Review Comment:
**Nit:** It would be better to use a default `RateLimiter` instance instead
of `null`, and add a `checkNotNull` guard for the `sharedRateLimiter` parameter
in the constructor.
Currently, many places still pass `null` for `sharedRateLimiter`. While this
is currently used test-only, it risks leaking into production code and could
introduce subtle bugs in the future. Using a default rate limiter would make
the API safer and more robust.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]