yihua commented on code in PR #13953:
URL: https://github.com/apache/hudi/pull/13953#discussion_r2366630290
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1908,19 +1932,15 @@ public void testPayloadClassUpdateWithCOWTable() throws
Exception {
assertTrue(props.containsKey(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key()));
//now create one more deltaStreamer instance and update payload class
- cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
+ HoodieDeltaStreamer.Config updatedConfig =
TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, true, DummyAvroPayload.class.getName(), null);
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
-
- props = new Properties();
- fs = HadoopFSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration());
- try (InputStream inputStream = fs.open(new Path(metaPath))) {
- props.load(inputStream);
- }
- //now using payload
- assertEquals(DummyAvroPayload.class.getName(),
props.get(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
+ Exception e = assertThrows(HoodieException.class, () -> {
+ new HoodieDeltaStreamer(updatedConfig, jsc, fs,
hiveServer.getHiveConf());
+ }, "Should error out when payload class is switched");
+ assertTrue(e.getMessage().contains("Config conflict(key"));
+
assertTrue(e.getMessage().contains(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
Review Comment:
Could the full error message be asserted instead of part of the message
which is confusing?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java:
##########
@@ -163,6 +163,15 @@ public HoodieStreamer(Config cfg, JavaSparkContext jssc,
FileSystem fs, Configur
cfg.recordMergeMode = mergingConfigs.getLeft();
cfg.payloadClassName = mergingConfigs.getMiddle();
cfg.recordMergeStrategyId = mergingConfigs.getRight();
+ if (null != cfg.recordMergeMode &&
!StringUtils.isNullOrEmpty(cfg.recordMergeMode.name())) {
+ this.properties.put(HoodieTableConfig.RECORD_MERGE_MODE.key(),
cfg.recordMergeMode.name());
+ }
+ if (!StringUtils.isNullOrEmpty(cfg.payloadClassName)) {
+ this.properties.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
cfg.payloadClassName);
+ }
+ if (!StringUtils.isNullOrEmpty(cfg.recordMergeStrategyId)) {
+ this.properties.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
cfg.recordMergeStrategyId);
Review Comment:
Should this use write config as the key or the corresponding table config?
Does Spark Datasource and SQL writer fail in the validation in the same place
is the merge (write or table) configs are changed?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1851,42 +1851,66 @@ public void testNullSchemaProvider() {
}
@Test
- public void testPayloadClassUpdate() throws Exception {
- String dataSetBasePath = basePath +
"/test_dataset_mor_payload_class_update";
+ public void testPartialPayloadClass() throws Exception {
+ String dataSetBasePath = basePath + "/test_dataset_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
- true, false, null, "MERGE_ON_READ");
+ true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
assertRecordCount(1000, dataSetBasePath, sqlContext);
+
+ //now assert that hoodie.properties file now has updated payload class name
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
dataSetBasePath, false);
assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
+ }
- //now create one more deltaStreamer instance and update payload class
- cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
- Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
- true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
-
- // NOTE: Payload class cannot be updated.
- metaClient = HoodieTableMetaClient.reload(metaClient);
- assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
+ private static Stream<Arguments> getArgsForMergeRelatedPropertiesUpdate() {
+ return Stream.of(
+ Arguments.of(true, DummyAvroPayload.class.getName(), null, null, null,
null, HoodieTableConfig.PAYLOAD_CLASS_NAME.key()),
+ Arguments.of(false, null, null, null, RecordMergeMode.CUSTOM,
HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID,
HoodieTableConfig.RECORD_MERGE_MODE.key()),
+ Arguments.of(false, null, RecordMergeMode.CUSTOM, "strategy_id_1",
RecordMergeMode.CUSTOM, "strategy_id_2",
HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key())
+ );
}
- @Test
- public void testPartialPayloadClass() throws Exception {
- String dataSetBasePath = basePath + "/test_dataset_mor";
+ @ParameterizedTest
+ @MethodSource("getArgsForMergeRelatedPropertiesUpdate")
+ public void testMergeRelatedPropertiesUpdate(
+ Boolean updatePayloadClass,
+ String updatedPayloadClassName,
+ RecordMergeMode firstMergeMode,
+ String firstStrategyId,
+ RecordMergeMode secondMergeMode,
+ String secondStrategyId,
+ String exceptionKey
+ ) throws Exception {
+ String dataSetBasePath = basePath + "/test_dataset_mor_merge_update";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
- true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
+ true, false, null, "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ if (null != firstMergeMode) {
+ cfg.recordMergeMode = firstMergeMode;
+ cfg.recordMergeStrategyId = firstStrategyId;
+ }
assertRecordCount(1000, dataSetBasePath, sqlContext);
-
- //now assert that hoodie.properties file now has updated payload class name
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
dataSetBasePath, false);
assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
+
+ //now create one more deltaStreamer instance and update payload class
+ HoodieDeltaStreamer.Config updatedConfig =
TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
+ Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
+ true, updatePayloadClass, updatedPayloadClassName, "MERGE_ON_READ");
+ if (null != secondMergeMode) {
+ updatedConfig.recordMergeMode = secondMergeMode;
+ updatedConfig.recordMergeStrategyId = secondStrategyId;
+ }
+ Exception e = assertThrows(HoodieException.class, () -> {
+ new HoodieDeltaStreamer(updatedConfig, jsc, fs,
hiveServer.getHiveConf());
+ }, "Should error out when merge mode is switched");
+ assertTrue(e.getMessage().contains("Config conflict(key"));
+ assertTrue(e.getMessage().contains(exceptionKey));
Review Comment:
Same on asserting the full error message
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java:
##########
@@ -89,8 +90,8 @@ private HoodieDeltaStreamer.Config createConfig(String
basePath, String sourceCh
false,
false,
100000,
- false,
- null,
+ true,
+ HoodieAvroPayload.class.getName(),
Review Comment:
Is `HoodieAvroPayload` used as the payload?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1851,42 +1851,66 @@ public void testNullSchemaProvider() {
}
@Test
- public void testPayloadClassUpdate() throws Exception {
- String dataSetBasePath = basePath +
"/test_dataset_mor_payload_class_update";
+ public void testPartialPayloadClass() throws Exception {
Review Comment:
nit: the original name still works? Also should we only keep one test with
parameterization on table types by merging `testPartialPayloadClass` (on MOR)
and `testPayloadClassUpdateWithCOWTable` (on COW)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]