danny0405 commented on a change in pull request #2430:
URL: https://github.com/apache/hudi/pull/2430#discussion_r556387652
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -122,21 +193,62 @@ public static HoodieRecordPayload createPayload(String
payloadClass, GenericReco
}
}
- public static HoodieWriteConfig
getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) {
- FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
+ /**
+ * Create a payload class via reflection, do not ordering/precombine value.
+ */
+ public static HoodieRecordPayload createPayload(String payloadClass,
GenericRecord record)
+ throws IOException {
+ try {
+ return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
+ new Class<?>[] {Option.class}, Option.of(record));
+ } catch (Throwable e) {
+ throw new IOException("Could not create payload for class: " +
payloadClass, e);
+ }
+ }
+
+ public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
HoodieWriteConfig.Builder builder =
-
HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes,
true)
-
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
-
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
- .build())
- .forTable(cfg.targetTableName)
- .withAutoCommit(false)
- .withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
- .getConfig());
-
- builder = builder.withSchema(new
FilebasedSchemaProvider(getProps(cfg)).getTargetSchema().toString());
- HoodieWriteConfig config = builder.build();
- return config;
+ HoodieWriteConfig.newBuilder()
+ .withEngineType(EngineType.FLINK)
+ .withPath(conf.getString(HoodieOptions.PATH))
+
.combineInput(conf.getBoolean(HoodieOptions.INSERT_DROP_DUPS), true)
+ .withCompactionConfig(
+ HoodieCompactionConfig.newBuilder()
+
.withPayloadClass(conf.getString(HoodieOptions.PAYLOAD_CLASS))
+ .build())
+ .forTable(conf.getString(HoodieOptions.TABLE_NAME))
+ .withAutoCommit(false)
+
.withProps(flinkConf2TypedProperties(HoodieOptions.flatOptions(conf)));
+
+ builder = builder.withSchema(getSourceSchema(conf).toString());
+ return builder.build();
+ }
+
+ /**
+ * Converts the give {@link Configuration} to {@link TypedProperties}.
+ * The default values are also set up.
+ *
+ * @param conf The flink configuration
+ * @return a TypedProperties instance
+ */
+ public static TypedProperties flinkConf2TypedProperties(Configuration conf) {
+ Properties properties = new Properties();
+ // put all the set up options
+ conf.addAllToProperties(properties);
+ // put all the default options
+ for (ConfigOption<?> option : HoodieOptions.OPTIONAL_OPTIONS) {
+ if (!conf.contains(option)) {
+ properties.put(option.key(), option.defaultValue());
+ }
+ }
+ return new TypedProperties(properties);
}
+ public static void checkRequiredProperties(TypedProperties props,
List<String> checkPropNames) {
+ checkPropNames.forEach(prop -> {
+ if (!props.containsKey(prop)) {
+ throw new HoodieNotSupportedException("Required property " + prop + "
is missing");
Review comment:
I just copy it from the old code, use `Preconditions.checkState` instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]