wuchong commented on code in PR #2757:
URL: https://github.com/apache/fluss/pull/2757#discussion_r2887289028
##########
fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java:
##########
@@ -77,4 +81,109 @@ static Map<String, ConfigOption<?>>
extractConfigOptions(String prefix) {
}
return options;
}
+
+ public static void validateCoordinatorConfigs(Configuration conf) {
+ validServerConfigs(conf);
+
+ validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
+ validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
+ validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
+
+ // Validate remote.data.dirs
+ List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
+ for (int i = 0; i < remoteDataDirs.size(); i++) {
+ String remoteDataDir = remoteDataDirs.get(i);
+ try {
+ new FsPath(remoteDataDir);
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid remote path for %s at index %d.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(), i),
+ e);
+ }
+ }
+
+ // Validate remote.data.dirs.strategy
+ ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
+ conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
+ if (remoteDataDirStrategy ==
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
+ List<Integer> weights =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
+ if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
+ if (remoteDataDirs.size() != weights.size()) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "The size of '%s' (%d) must match the size
of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(),
+ remoteDataDirs.size(),
+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+ weights.size()));
+ }
+ // Validate all weights are positive
+ for (int i = 0; i < weights.size(); i++) {
+ if (weights.get(i) < 0) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "All weights in '%s' must be no less
than 0, but found %d at index %d.",
Review Comment:
The condition `weights.get(i) < 0` should be `weights.get(i) <= 0`, and the
error message should be "must be greater than 0".
##########
fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java:
##########
@@ -409,6 +409,23 @@ public static UUID uuidFromRemoteIndexCacheFileName(String
fileName) {
fileName.substring(fileName.indexOf('_') + 1,
fileName.indexOf('.')));
}
+ //
----------------------------------------------------------------------------------------
+ // Remote Data Paths
+ //
----------------------------------------------------------------------------------------
+
+ /**
+ * Returns the remote root directory path for storing data files.
+ *
+ * <p>The path contract:
+ *
+ * <pre>
+ * {$remote.data.dir}
+ * </pre>
+ */
+ public static FsPath remoteDataDir(Configuration conf) {
+ return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
Review Comment:
I think this method is very confusing, when user set `remote.data.dirs`
rather than `remote.data.dir`, this method will fail or return the wrong
result. It seems this method is only used by `FlussConfigUtils` for validate
the path, could you remove this method, and just `new FsPath` in
`FlussConfigUtils`?
##########
fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java:
##########
@@ -77,4 +81,109 @@ static Map<String, ConfigOption<?>>
extractConfigOptions(String prefix) {
}
return options;
}
+
+ public static void validateCoordinatorConfigs(Configuration conf) {
+ validServerConfigs(conf);
+
+ validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
+ validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
+ validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
+
+ // Validate remote.data.dirs
+ List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
+ for (int i = 0; i < remoteDataDirs.size(); i++) {
+ String remoteDataDir = remoteDataDirs.get(i);
+ try {
+ new FsPath(remoteDataDir);
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid remote path for %s at index %d.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(), i),
+ e);
+ }
+ }
+
+ // Validate remote.data.dirs.strategy
+ ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
+ conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
+ if (remoteDataDirStrategy ==
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
+ List<Integer> weights =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
+ if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
+ if (remoteDataDirs.size() != weights.size()) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "The size of '%s' (%d) must match the size
of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(),
+ remoteDataDirs.size(),
+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+ weights.size()));
+ }
+ // Validate all weights are positive
+ for (int i = 0; i < weights.size(); i++) {
+ if (weights.get(i) < 0) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "All weights in '%s' must be no less
than 0, but found %d at index %d.",
+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+ weights.get(i),
+ i));
+ }
+ }
+ }
+ }
+ }
+
+ public static void validateTabletConfigs(Configuration conf) {
+ validServerConfigs(conf);
+
+ Optional<Integer> serverId =
conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
+ if (!serverId.isPresent()) {
+ throw new IllegalConfigurationException(
+ String.format("Configuration %s must be set.",
ConfigOptions.TABLET_SERVER_ID));
+ }
+ validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
+
+ validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);
+
+ if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() >
Integer.MAX_VALUE) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid configuration for %s, it must be less
than or equal %d bytes.",
+ ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(),
Integer.MAX_VALUE));
+ }
+ }
+
+ /** Validate common server configs. */
+ private static void validServerConfigs(Configuration conf) {
+ if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
+ throw new IllegalConfigurationException(
Review Comment:
We should allow `remote.data.dirs` is set and `remote.data.dir` is empty.
##########
website/docs/maintenance/configuration.md:
##########
@@ -36,6 +36,9 @@ during the Fluss cluster working.
| default.bucket.number | Integer | 1
| The default number of buckets for a table in Fluss cluster. It's a
cluster-level parameter and all the tables without specifying bucket number in
the cluster will use the value as the bucket number.
|
| default.replication.factor | Integer | 1
| The default replication factor for the log of a table in Fluss
cluster. It's a cluster-level parameter, and all the tables without specifying
replication factor in the cluster will use the value as replication factor.
|
| remote.data.dir | String |
(None)
| The directory used for storing the kv snapshot data files and remote
log for log tiered storage in a Fluss supported filesystem.
|
+| remote.data.dirs | List<String> |
(None)
| The directories used for storing the kv snapshot data files and
remote log for log tiered storage in a Fluss supported filesystem. This should
be a comma-separated list of remote URIs. If not configured, it defaults to the
path specified in `remote.data.dir`. Otherwise, one of the paths from this
configuration will be used.
|
Review Comment:
Explain when (new table/partitions created) and what dir (by
`remote.data.dirs.strategy`) will be used, and the relationship between
`remote.data.dirs` and `remote.data.dir` (when to configure which, what
behavior when both confiugred).
##########
fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java:
##########
@@ -77,4 +81,109 @@ static Map<String, ConfigOption<?>>
extractConfigOptions(String prefix) {
}
return options;
}
+
+ public static void validateCoordinatorConfigs(Configuration conf) {
+ validServerConfigs(conf);
+
+ validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
+ validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
+ validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
+
+ // Validate remote.data.dirs
+ List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
+ for (int i = 0; i < remoteDataDirs.size(); i++) {
+ String remoteDataDir = remoteDataDirs.get(i);
+ try {
+ new FsPath(remoteDataDir);
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid remote path for %s at index %d.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(), i),
+ e);
+ }
+ }
+
+ // Validate remote.data.dirs.strategy
+ ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
+ conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
+ if (remoteDataDirStrategy ==
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
+ List<Integer> weights =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
Review Comment:
We should check the `weights` is not empty when the strategy is
`WEIGHTED_ROUND_ROBIN`. Otherwise, the validation will skip the check in the
`!remoteDataDirs.isEmpty() && !weights.isEmpty()` code block..
##########
fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java:
##########
@@ -77,4 +81,109 @@ static Map<String, ConfigOption<?>>
extractConfigOptions(String prefix) {
}
return options;
}
+
+ public static void validateCoordinatorConfigs(Configuration conf) {
+ validServerConfigs(conf);
+
+ validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
+ validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
+ validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
+
+ // Validate remote.data.dirs
+ List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
+ for (int i = 0; i < remoteDataDirs.size(); i++) {
+ String remoteDataDir = remoteDataDirs.get(i);
+ try {
+ new FsPath(remoteDataDir);
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid remote path for %s at index %d.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(), i),
+ e);
+ }
+ }
+
+ // Validate remote.data.dirs.strategy
+ ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
+ conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
+ if (remoteDataDirStrategy ==
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
+ List<Integer> weights =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
+ if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
+ if (remoteDataDirs.size() != weights.size()) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "The size of '%s' (%d) must match the size
of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(),
+ remoteDataDirs.size(),
+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+ weights.size()));
+ }
+ // Validate all weights are positive
+ for (int i = 0; i < weights.size(); i++) {
+ if (weights.get(i) < 0) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "All weights in '%s' must be no less
than 0, but found %d at index %d.",
+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+ weights.get(i),
+ i));
+ }
+ }
+ }
+ }
+ }
+
+ public static void validateTabletConfigs(Configuration conf) {
+ validServerConfigs(conf);
+
+ Optional<Integer> serverId =
conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
+ if (!serverId.isPresent()) {
+ throw new IllegalConfigurationException(
+ String.format("Configuration %s must be set.",
ConfigOptions.TABLET_SERVER_ID));
+ }
+ validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
+
+ validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);
+
+ if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() >
Integer.MAX_VALUE) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid configuration for %s, it must be less
than or equal %d bytes.",
+ ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(),
Integer.MAX_VALUE));
+ }
+ }
+
+ /** Validate common server configs. */
+ private static void validServerConfigs(Configuration conf) {
Review Comment:
rename to `validateServerConfigs`
##########
fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java:
##########
@@ -77,4 +81,109 @@ static Map<String, ConfigOption<?>>
extractConfigOptions(String prefix) {
}
return options;
}
+
+ public static void validateCoordinatorConfigs(Configuration conf) {
+ validServerConfigs(conf);
+
+ validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
+ validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
+ validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
+
+ // Validate remote.data.dirs
+ List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
+ for (int i = 0; i < remoteDataDirs.size(); i++) {
+ String remoteDataDir = remoteDataDirs.get(i);
+ try {
+ new FsPath(remoteDataDir);
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid remote path for %s at index %d.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(), i),
+ e);
+ }
+ }
+
+ // Validate remote.data.dirs.strategy
+ ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
+ conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
+ if (remoteDataDirStrategy ==
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
+ List<Integer> weights =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
+ if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
+ if (remoteDataDirs.size() != weights.size()) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "The size of '%s' (%d) must match the size
of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(),
+ remoteDataDirs.size(),
+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+ weights.size()));
+ }
+ // Validate all weights are positive
+ for (int i = 0; i < weights.size(); i++) {
+ if (weights.get(i) < 0) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "All weights in '%s' must be no less
than 0, but found %d at index %d.",
+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+ weights.get(i),
+ i));
+ }
+ }
+ }
+ }
+ }
+
+ public static void validateTabletConfigs(Configuration conf) {
+ validServerConfigs(conf);
+
+ Optional<Integer> serverId =
conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
+ if (!serverId.isPresent()) {
+ throw new IllegalConfigurationException(
+ String.format("Configuration %s must be set.",
ConfigOptions.TABLET_SERVER_ID));
+ }
Review Comment:
I think all the configuration validation should be shared between
Coordinator and TabletServer, except the `TABLET_SERVER_ID`. It is possible for
TabletServer to use the `remote.data.dirs` rather than forcing TabletServer
always use `remote.data.dir`. Same to other configurations. So could you
refactor the config validation to extract the common validation for both
Cooridnator and TabletServer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]