xushiyan commented on code in PR #7336:
URL: https://github.com/apache/hudi/pull/7336#discussion_r1238666276
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java:
##########
@@ -87,54 +92,91 @@ public WriteStatus() {
* Mark write as success, optionally using given parameters for the purpose
of calculating some aggregate metrics.
* This method is not meant to cache passed arguments, since WriteStatus
objects are collected in Spark Driver.
*
- * @param record deflated {@code HoodieRecord} containing information that
uniquely identifies it.
+ * @param record deflated {@code HoodieRecord} containing
information that uniquely identifies it.
* @param optionalRecordMetadata optional metadata related to data contained
in {@link HoodieRecord} before deflation.
*/
public void markSuccess(HoodieRecord record, Option<Map<String, String>>
optionalRecordMetadata) {
if (trackSuccessRecords) {
- writtenRecords.add(record);
+
writtenRecordDelegates.add(HoodieRecordDelegate.fromHoodieRecord(record));
}
+ updateStatsForSuccess(optionalRecordMetadata);
+ }
+
+ /**
+ * Used by native write handles like HoodieRowCreateHandle and
HoodieRowDataCreateHandle.
+ *
+ * @see WriteStatus#markSuccess(HoodieRecord, Option)
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public void markSuccess(HoodieRecordDelegate recordDelegate,
Option<Map<String, String>> optionalRecordMetadata) {
+ if (trackSuccessRecords) {
+ writtenRecordDelegates.add(Objects.requireNonNull(recordDelegate));
+ }
+ updateStatsForSuccess(optionalRecordMetadata);
+ }
+
+ private void updateStatsForSuccess(Option<Map<String, String>>
optionalRecordMetadata) {
totalRecords++;
// get the min and max event time for calculating latency and freshness
- if (optionalRecordMetadata.isPresent()) {
- String eventTimeVal =
optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
- try {
- if (!StringUtils.isNullOrEmpty(eventTimeVal)) {
- int length = eventTimeVal.length();
- long millisEventTime;
- // eventTimeVal in seconds unit
- if (length == 10) {
- millisEventTime = Long.parseLong(eventTimeVal) * 1000;
- } else if (length == 13) {
- // eventTimeVal in millis unit
- millisEventTime = Long.parseLong(eventTimeVal);
- } else {
- throw new IllegalArgumentException("not support event_time
format:" + eventTimeVal);
- }
- long eventTime =
DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli();
- stat.setMinEventTime(eventTime);
- stat.setMaxEventTime(eventTime);
- }
- } catch (DateTimeException | IllegalArgumentException e) {
- LOG.debug(String.format("Fail to parse event time value: %s",
eventTimeVal), e);
+ String eventTimeVal = optionalRecordMetadata.orElse(Collections.emptyMap())
+ .getOrDefault(METADATA_EVENT_TIME_KEY, null);
+ if (isNullOrEmpty(eventTimeVal)) {
+ return;
+ }
+ try {
+ int length = eventTimeVal.length();
+ long millisEventTime;
+ // eventTimeVal in seconds unit
+ if (length == 10) {
+ millisEventTime = Long.parseLong(eventTimeVal) * 1000;
+ } else if (length == 13) {
+ // eventTimeVal in millis unit
+ millisEventTime = Long.parseLong(eventTimeVal);
+ } else {
+ throw new IllegalArgumentException("not support event_time format:" +
eventTimeVal);
}
+ long eventTime =
DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli();
+ stat.setMinEventTime(eventTime);
+ stat.setMaxEventTime(eventTime);
+ } catch (DateTimeException | IllegalArgumentException e) {
+ LOG.debug(String.format("Fail to parse event time value: %s",
eventTimeVal), e);
Review Comment:
this diff is due to simplifying the nested condition block with early
return; logic remains the same
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]