danny0405 commented on code in PR #5528:
URL: https://github.com/apache/hudi/pull/5528#discussion_r871015960
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -708,32 +709,45 @@ private FlinkOptions() {
// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";
-
- /**
- * Collects the config options that start with 'properties.' into a
'key'='value' list.
- */
- public static Map<String, String> getHoodieProperties(Map<String, String>
options) {
- return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX);
- }
+ private static final String HADOOP_PREFIX = "hadoop.";
+ private static final String PARQUET_PREFIX = "parquet.";
/**
* Collects the config options that start with specified prefix {@code
prefix} into a 'key'='value' list.
*/
- public static Map<String, String> getHoodiePropertiesWithPrefix(Map<String,
String> options, String prefix) {
+ public static Map<String, String> getPropertiesWithPrefix(Map<String,
String> options, String prefix) {
final Map<String, String> hoodieProperties = new HashMap<>();
-
- if (hasPropertyOptions(options)) {
+ if (hasPropertyOptions(options, prefix)) {
options.keySet().stream()
- .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .filter(key -> key.startsWith(prefix))
.forEach(key -> {
final String value = options.get(key);
- final String subKey = key.substring((prefix).length());
+ final String subKey = key.substring(prefix.length());
hoodieProperties.put(subKey, value);
});
}
return hoodieProperties;
}
+ public static org.apache.hadoop.conf.Configuration getParquetConf(
+ org.apache.flink.configuration.Configuration options,
+ org.apache.hadoop.conf.Configuration hadoopConf) {
+ org.apache.hadoop.conf.Configuration copy = new
org.apache.hadoop.conf.Configuration(hadoopConf);
+ Map<String, String> parquetOptions =
getPropertiesWithPrefix(options.toMap(), PARQUET_PREFIX);
+ parquetOptions.forEach((k, v) -> copy.set(PARQUET_PREFIX + k, v));
+ return copy;
+ }
+
+ /**
+ * Create a new hadoop configuration that is initialized with the given
flink configuration.
+ */
+ public static org.apache.hadoop.conf.Configuration
getHadoopConf(Configuration conf) {
+ org.apache.hadoop.conf.Configuration hadoopConf =
FlinkClientUtil.getHadoopConf();
+ Map<String, String> options = getPropertiesWithPrefix(conf.toMap(),
HADOOP_PREFIX);
+ options.forEach((k, v) -> hadoopConf.set(k, v));
Review Comment:
Thanks, overall looks good, can we create a new clazz named
`HadoopConfigurations` under package `org.apache.hudi.configuration` and move
the methods `getParquetConf` `getHadoopConf` there ?
Let's keep the `FlinkOptions` clean for only flink confugurations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]