linliu-code commented on code in PR #13953:
URL: https://github.com/apache/hudi/pull/13953#discussion_r2380033880
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1851,42 +1851,66 @@ public void testNullSchemaProvider() {
}
@Test
- public void testPayloadClassUpdate() throws Exception {
- String dataSetBasePath = basePath +
"/test_dataset_mor_payload_class_update";
+ public void testPartialPayloadClass() throws Exception {
+ String dataSetBasePath = basePath + "/test_dataset_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
- true, false, null, "MERGE_ON_READ");
+ true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
assertRecordCount(1000, dataSetBasePath, sqlContext);
+
+ //now assert that hoodie.properties file now has updated payload class name
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
dataSetBasePath, false);
assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
+ }
- //now create one more deltaStreamer instance and update payload class
- cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
- Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
- true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
-
- // NOTE: Payload class cannot be updated.
- metaClient = HoodieTableMetaClient.reload(metaClient);
- assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
+ private static Stream<Arguments> getArgsForMergeRelatedPropertiesUpdate() {
+ return Stream.of(
+ Arguments.of(true, DummyAvroPayload.class.getName(), null, null, null,
null, HoodieTableConfig.PAYLOAD_CLASS_NAME.key()),
+ Arguments.of(false, null, null, null, RecordMergeMode.CUSTOM,
HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID,
HoodieTableConfig.RECORD_MERGE_MODE.key()),
+ Arguments.of(false, null, RecordMergeMode.CUSTOM, "strategy_id_1",
RecordMergeMode.CUSTOM, "strategy_id_2",
HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key())
+ );
}
- @Test
- public void testPartialPayloadClass() throws Exception {
- String dataSetBasePath = basePath + "/test_dataset_mor";
+ @ParameterizedTest
+ @MethodSource("getArgsForMergeRelatedPropertiesUpdate")
+ public void testMergeRelatedPropertiesUpdate(
+ Boolean updatePayloadClass,
+ String updatedPayloadClassName,
+ RecordMergeMode firstMergeMode,
+ String firstStrategyId,
+ RecordMergeMode secondMergeMode,
+ String secondStrategyId,
+ String exceptionKey
+ ) throws Exception {
+ String dataSetBasePath = basePath + "/test_dataset_mor_merge_update";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
- true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
+ true, false, null, "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ if (null != firstMergeMode) {
+ cfg.recordMergeMode = firstMergeMode;
+ cfg.recordMergeStrategyId = firstStrategyId;
+ }
assertRecordCount(1000, dataSetBasePath, sqlContext);
-
- //now assert that hoodie.properties file now has updated payload class name
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
dataSetBasePath, false);
assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
+
+ //now create one more deltaStreamer instance and update payload class
+ HoodieDeltaStreamer.Config updatedConfig =
TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
+ Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
+ true, updatePayloadClass, updatedPayloadClassName, "MERGE_ON_READ");
+ if (null != secondMergeMode) {
+ updatedConfig.recordMergeMode = secondMergeMode;
+ updatedConfig.recordMergeStrategyId = secondStrategyId;
+ }
+ Exception e = assertThrows(HoodieException.class, () -> {
Review Comment:
We haven't add the check, how can the erorr be thrown here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]