weimingdiit commented on code in PR #6983:
URL: https://github.com/apache/hudi/pull/6983#discussion_r1007675272
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java:
##########
@@ -69,28 +73,29 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config,
String instantTime,
@Override
public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord>
payload) {
- final HoodieRecord insertPayload = payload.record;
+ final HoodieRecord<T> insertPayload = payload.record;
String partitionPath = insertPayload.getPartitionPath();
HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
- if (handle == null) {
- // If the records are sorted, this means that we encounter a new
partition path
- // and the records for the previous partition path are all written,
- // so we can safely closely existing open handle to reduce memory
footprint.
- if (areRecordsSorted) {
- closeOpenHandles();
+ if (handle == null || !handle.canWrite(payload.record)) {
+ if (handle == null) {
+ // If the records are sorted, this means that we encounter a new
partition path
+ // and the records for the previous partition path are all written,
+ // so we can safely closely existing open handle to reduce memory
footprint.
+ if (areRecordsSorted) {
+ closeOpenHandles();
+ }
+ } else {
+ // Handle is full. Close the handle and add the WriteStatus
+ statuses.addAll(handle.close());
}
- // Lazily initialize the handle, for the first time
- handle = writeHandleFactory.create(config, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
- handles.put(partitionPath, handle);
- }
-
- if (!handle.canWrite(payload.record)) {
- // Handle is full. Close the handle and add the WriteStatus
- statuses.addAll(handle.close());
- // Open new handle
+ Option<IndexedRecord> insertRecord = payload.insertValue;
+ // just skip the ignored record,do not make partitions on fs
+ if (insertRecord.isPresent() &&
insertRecord.get().equals(IGNORE_RECORD)) {
+ return;
+ }
Review Comment:
after judge handle, so you don't have to judge every Record, Just judge when
the if condition is met
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java:
##########
@@ -69,28 +73,29 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config,
String instantTime,
@Override
public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord>
payload) {
- final HoodieRecord insertPayload = payload.record;
+ final HoodieRecord<T> insertPayload = payload.record;
String partitionPath = insertPayload.getPartitionPath();
HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
- if (handle == null) {
- // If the records are sorted, this means that we encounter a new
partition path
- // and the records for the previous partition path are all written,
- // so we can safely closely existing open handle to reduce memory
footprint.
- if (areRecordsSorted) {
- closeOpenHandles();
+ if (handle == null || !handle.canWrite(payload.record)) {
+ if (handle == null) {
Review Comment:
if no change this logic,Some code is duplicate:
in my first version :
first judge ignored record, other code no change,but every record need to
judge,it is unnecessary
second version:
i move ignored record logic into if condition, but it need twice,if(handle
== null) {...} and if(!handle.canWrite(payload.record)) {...}, and
writeHandleFactory.create() also call twice, it is duplicate.
so i change this logic flow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]