danny0405 commented on a change in pull request #2553: URL: https://github.com/apache/hudi/pull/2553#discussion_r572838385
########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.HoodieRecordSizeEstimator; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieMergeHandle} that supports merge write incrementally(mini-batches). + * + * <p>For a new mini-batch, it initialize and set up the next file path to write, + * and closes the file path when the mini-batch write finish. When next mini-batch + * write starts, it rolls over to another new file. If all the mini-batches write finish + * for a checkpoint round, it renames the last new file path as the desired file name + * (name with the expected file ID). + * + * @param <T> Payload type + * @param <I> Input type + * @param <K> Key type + * @param <O> Output type + */ +public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O> + extends HoodieMergeHandle<T, I, K, O> + implements MiniBatchHandle { + + private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class); + + /** + * Records the current file handles number that rolls over. + */ + private int rollNumber = 0; + /** + * Records the rolled over file paths. + */ + private List<Path> rolloverPaths; + /** + * Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet. + */ + private boolean needBootStrap = true; + + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier); + rolloverPaths = new ArrayList<>(); + } + + /** + * Called by compactor code path. + */ + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier); + } + + /** + * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + */ + protected String dataFileName() { + return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + rollNumber, hoodieTable.getBaseFileExtension()); + } + + public boolean isNeedBootStrap() { + return needBootStrap; + } + + @Override + public List<WriteStatus> close() { + List<WriteStatus> writeStatus = super.close(); + this.needBootStrap = false; + return writeStatus; + } + + /** + * THe difference with the parent method is that there is no need to set up + * locations for the records. + * + * @param fileId The file ID + * @param newRecordsItr The incremental records iterator + */ + @Override + protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) { + try { + // Load the new records in a map + long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps()); + LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); + this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), + new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema)); + } catch (IOException io) { + throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); + } + while (newRecordsItr.hasNext()) { + HoodieRecord<T> record = newRecordsItr.next(); + // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist + keyToNewRecords.put(record.getRecordKey(), record); + } + LOG.info("Number of entries in MemoryBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() + + "Total size in bytes of MemoryBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); + } + + /** + * + * Rolls over the write handle to prepare for the next batch write. + * + * <p>It tweaks the handle state as following: + * + * <ul> + * <li>Increment the {@code rollNumber}</li> + * <li>Book-keep the last file path, these files (except the last one) are temporary that need to be cleaned</li> + * <li>Make the last new file path as old</li> + * <li>Initialize the new file path and file writer</li> + * </ul> + * + * @param newRecordsItr The records iterator to update + */ + public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) { + init(this.fileId, newRecordsItr); + this.recordsWritten = 0; + this.recordsDeleted = 0; + this.updatedRecordsWritten = 0; + this.insertRecordsWritten = 0; + this.writeStatus.setTotalErrorRecords(0); + this.timer = new HoodieTimer().startTimer(); + + rollNumber++; + + rolloverPaths.add(newFilePath); + oldFilePath = newFilePath; + // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + String newFileName = dataFileName(); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + newFileName).toString(); + newFilePath = new Path(config.getBasePath(), relativePath); + + try { + fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier); + } catch (IOException e) { + throw new HoodieIOException("Error when creating file writer for path " + newFilePath, e); + } + + LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(), + newFilePath.toString())); + } + + public void finishWrite() { + for (int i = 0; i < rolloverPaths.size() - 1; i++) { + Path path = rolloverPaths.get(i); + try { + fs.delete(path, false); + } catch (IOException e) { + throw new HoodieIOException("Error when clean the temporary roll file: " + path, e); + } + } + Path lastPath = rolloverPaths.size() > 0 + ? rolloverPaths.get(rolloverPaths.size() - 1) + : newFilePath; + String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + newFileName).toString(); + final Path desiredPath = new Path(config.getBasePath(), relativePath); + try { + fs.rename(lastPath, desiredPath); Review comment: Create new file would duplicate the data write the last batch, it copy and rewrite all the bucket data, more worse, when there is only 1 mini-batch, we still duplicates the write. I'm expecting a way to avoid the duplicate write and also friendly to object store. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
