[
https://issues.apache.org/jira/browse/HUDI-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394387#comment-17394387
]
ASF GitHub Bot commented on HUDI-1771:
--------------------------------------
vinothchandar commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r683844723
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##########
@@ -620,23 +633,31 @@ public boolean reachedEnd() throws IOException {
while (logKeysIterator.hasNext()) {
final String curKey = logKeysIterator.next();
if (!keyToSkip.contains(curKey)) {
- Option<IndexedRecord> insertAvroRecord =
-
scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema);
+ Option<IndexedRecord> insertAvroRecord = getInsetValue(curKey);
if (insertAvroRecord.isPresent()) {
// the record is a DELETE if insertAvroRecord not present, skipping
- GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+ GenericRecord avroRecord = buildAvroRecordBySchema(
insertAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
- this.currentRecord = (RowData)
avroToRowDataConverter.convert(requiredAvroRecord);
+ this.currentRecord = (RowData)
avroToRowDataConverter.convert(avroRecord);
+
this.currentRecord.setRowKind(FormatUtils.getRowKind(insertAvroRecord.get(),
this.operationPos));
return false;
}
}
}
return true;
}
+ private Option<IndexedRecord> getInsetValue(String curKey) throws
IOException {
Review comment:
typo
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
##########
@@ -120,8 +136,10 @@ public static long generateChecksum(byte[] data) {
public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String
payloadClazz, Pair<String, String> recordKeyPartitionPathPair) {
String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString();
String partitionPath =
rec.get(recordKeyPartitionPathPair.getRight()).toString();
+ String operation = getNullableValAsString(rec,
HoodieRecord.OPERATION_METADATA_FIELD);
Review comment:
use an `Option`?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -141,6 +141,13 @@ private static Schema getWriteSchema(HoodieWriteConfig
config) {
return new Schema.Parser().parse(config.getWriteSchema());
}
+ /**
+ * Whether to include the '_hoodie_operation' field in the metadata fields.
+ */
+ protected boolean withOperationField() {
Review comment:
why can't this be just a `WriteConfig`? that we can set to different
values for spark and flink
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -350,18 +352,18 @@ public HoodieRestoreMetadata restore(HoodieEngineContext
context, String restore
protected HoodieMergeHandle getUpdateHandle(String instantTime, String
partitionPath, String fileId,
Map<String, HoodieRecord<T>>
keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
if (requireSortedRecords()) {
- return new HoodieSortedMergeHandle<>(config, instantTime, this,
keyToNewRecords, partitionPath, fileId,
+ return new FlinkCompactorSortedMergeHandle<>(config, instantTime, this,
keyToNewRecords, partitionPath, fileId,
Review comment:
adv of doing this as a `WriteConfig` would be that these new classes
need not exist?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -191,6 +192,17 @@ protected boolean isUpdateRecord(HoodieRecord<T>
hoodieRecord) {
return hoodieRecord.getCurrentLocation() != null;
}
+ /**
+ * Add cdc operation to the record, default do nothing.
+ *
+ * @param record The record.
+ * @param flag The change flag name.
+ * @see HoodieOperation
+ */
+ protected void addOperationToRecord(GenericRecord record, String flag) {
Review comment:
let's have a enum for the op flag?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Propagate CDC format for hoodie
> -------------------------------
>
> Key: HUDI-1771
> URL: https://issues.apache.org/jira/browse/HUDI-1771
> Project: Apache Hudi
> Issue Type: New Feature
> Components: Flink Integration
> Reporter: Danny Chen
> Assignee: Zheng yunhong
> Priority: Major
> Labels: pull-request-available, sev:normal
> Fix For: 0.9.0
>
>
> Like what we discussed in the dev mailing list:
> https://lists.apache.org/thread.html/r31b2d1404e4e043a5f875b78105ba6f9a801e78f265ad91242ad5eb2%40%3Cdev.hudi.apache.org%3E
> Keep the change flags make new use cases possible: using HUDI as the unified
> storage format for DWD and DWS layer.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)