the-other-tim-brown commented on code in PR #13964:
URL: https://github.com/apache/hudi/pull/13964#discussion_r2373816071
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -911,8 +911,9 @@ private WriteClientWriteResult writeToSink(InputBatch
inputBatch, String instant
BaseDatasetBulkInsertCommitActionExecutor executor = new
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig,
writeClient);
writeClientWriteResult = new WriteClientWriteResult(executor.execute(df,
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
} else {
- HoodieRecordType recordType = createRecordMerger(props).getRecordType();
- Option<JavaRDD<HoodieRecord>> recordsOption =
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(),
inputBatch.getSchemaProvider(),
+ TypedProperties mergeProps = ConfigUtils.getMergeProps(props,
metaClient.getTableConfig());
Review Comment:
extra space between `= ConfigUtils...`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -508,11 +508,16 @@ class HoodieSparkSqlWriterInternal {
throw new
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName}
only support parquet log.")
}
instantTime = client.startCommit(commitActionType)
+ // if table has undergone upgrade, we need to reload table config
+ tableConfig = HoodieTableMetaClient.builder()
Review Comment:
Can we use `tableMetaClient.reloadTableConfig()` for this instead?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecords.java:
##########
@@ -48,6 +48,24 @@ public static <T> BufferedRecord<T>
fromHoodieRecord(HoodieRecord record, Schema
return new BufferedRecord<>(recordKey,
recordContext.convertOrderingValueToEngineType(orderingValue), data, schemaId,
inferOperation(isDelete, record.getOperation()));
}
+ public static <T> BufferedRecord<T>
fromHoodieRecordWithDeflatedRecord(HoodieRecord record, Schema schema,
RecordContext<T> recordContext, Properties props,
+
String[] orderingFields, DeleteContext deleteContext) {
+ HoodieOperation hoodieOperation = record.getIgnoreIndexUpdate() ?
HoodieOperation.UPDATE_BEFORE : record.getOperation();
Review Comment:
This needs to use `inferOperation` to ensure consistency with the current
path. That may explain the flink failure.
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java:
##########
@@ -168,7 +168,8 @@ void readWithEventTimeOrderingWithRecords() throws
IOException {
// update for 4 is ignored due to lower ordering value.
// record5 is deleted.
// delete for 6 is ignored due to lower ordering value.
- assertEquals(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4, testRecord6, testRecord7),
actualRecords);
+
assertEquals(Arrays.asList(getSerializableIndexedRecord(testRecord1UpdateWithSameTime),
getSerializableIndexedRecord(testRecord2Update),
Review Comment:
nitpick: you can use `Stream.of(...).map(getSerializableIndexedRecord)...`
to make it a bit less verbose here and below
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java:
##########
@@ -462,7 +462,7 @@ public void testStopWithSavepointAndRestore() throws
Exception {
}
}
- @Test
+ // @Test disabling due to flakiness
Review Comment:
If this needs to be disabled, let's use `@Disabled(GH-XYZ)` to track it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]