yanghua commented on a change in pull request #2553: URL: https://github.com/apache/hudi/pull/2553#discussion_r572611335
########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieMergeHandle} that supports create write incrementally(mini-batches). Review comment: A `HoodieCreateHandle`? ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +/** + * Create handle factory for Flink writer, use the specified write handle directly. + */ +public class ExplicitCreateHandleFactory<T extends HoodieRecordPayload, I, K, O> Review comment: So what's the difference between `FlinkCreateHandleFactory` and `ExplicitCreateHandleFactory `? Is `FlinkCreateHandleFactory` useless? ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.HoodieRecordSizeEstimator; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieMergeHandle} that supports merge write incrementally(mini-batches). + * + * <p>For a new mini-batch, it initialize and set up the next file path to write, + * and closes the file path when the mini-batch write finish. When next mini-batch + * write starts, it rolls over to another new file. If all the mini-batches write finish + * for a checkpoint round, it renames the last new file path as the desired file name + * (name with the expected file ID). + * Review comment: IMO, can we ignore the concept of the `mini-batch` in the implementation(interface). Making it a default mechansim? ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java ########## @@ -61,14 +62,117 @@ protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineCont super(config, context, metaClient); } + /** + * Upsert a batch of new records into Hoodie table at the supplied instantTime. + * + * <p>Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata<List<WriteStatus>> upsert( + HoodieEngineContext context, + HoodieWriteHandle<?, ?, ?, ?> writeHandle, + String instantTime, + List<HoodieRecord<T>> records) { + return new FlinkUpsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute(); + } + + /** + * Insert a batch of new records into Hoodie table at the supplied instantTime. + * + * <p>Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata<List<WriteStatus>> insert( + HoodieEngineContext context, + HoodieWriteHandle<?, ?, ?, ?> writeHandle, + String instantTime, + List<HoodieRecord<T>> records) { + return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute(); + } + + /** + * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be + * de-duped and non existent keys will be removed before deleting. + * + * <p>Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param keys {@link List} of {@link HoodieKey}s to be deleted + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata<List<WriteStatus>> delete( + HoodieEngineContext context, + HoodieWriteHandle<?, ?, ?, ?> writeHandle, + String instantTime, + List<HoodieKey> keys) { + return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config, this, instantTime, keys).execute(); + } + + /** + * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + * <p>This implementation requires that the input records are already tagged, and de-duped if needed. + * + * <p>Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped( + HoodieEngineContext context, + HoodieWriteHandle<?, ?, ?, ?> writeHandle, + String instantTime, + List<HoodieRecord<T>> preppedRecords) { + return new FlinkUpsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute(); + } + + /** + * Inserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + * <p>This implementation requires that the input records are already tagged, and de-duped if needed. + * + * <p>Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata<List<WriteStatus>> insertPrepped( + HoodieEngineContext context, + HoodieWriteHandle<?, ?, ?, ?> writeHandle, + String instantTime, + List<HoodieRecord<T>> preppedRecords) { + return new FlinkInsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute(); + } + @Override public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> records) { - return new FlinkUpsertCommitActionExecutor<>(context, config, this, instantTime, records).execute(); + throw new IllegalAccessError("This method should not be invoked"); Review comment: Maybe `UnsupportedOperationException` more reasonable? ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.HoodieRecordSizeEstimator; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieMergeHandle} that supports merge write incrementally(mini-batches). + * + * <p>For a new mini-batch, it initialize and set up the next file path to write, + * and closes the file path when the mini-batch write finish. When next mini-batch + * write starts, it rolls over to another new file. If all the mini-batches write finish + * for a checkpoint round, it renames the last new file path as the desired file name + * (name with the expected file ID). + * + * @param <T> Payload type + * @param <I> Input type + * @param <K> Key type + * @param <O> Output type + */ +public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O> + extends HoodieMergeHandle<T, I, K, O> + implements MiniBatchHandle { + + private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class); + + /** + * Records the current file handles number that rolls over. + */ + private int rollNumber = 0; + /** + * Records the rolled over file paths. + */ + private List<Path> rolloverPaths; + /** + * Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet. + */ + private boolean needBootStrap = true; + + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier); + rolloverPaths = new ArrayList<>(); + } + + /** + * Called by compactor code path. + */ + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier); + } + + /** + * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + */ + protected String generatesDataFileName() { + return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + rollNumber, hoodieTable.getBaseFileExtension()); + } + + public boolean isNeedBootStrap() { + return needBootStrap; + } + + @Override + public List<WriteStatus> close() { + List<WriteStatus> writeStatus = super.close(); + this.needBootStrap = false; + return writeStatus; + } + + /** + * The difference with the parent method is that there is no need to set up + * locations for the records. + * + * @param fileId The file ID + * @param newRecordsItr The incremental records iterator + */ + @Override + protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) { + try { + // Load the new records in a map + long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps()); + LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); + this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), + new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema)); + } catch (IOException io) { + throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); + } Review comment: At least, this segment is the same, right? Let's reduce the duplicated code. ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieMergeHandle} that supports create write incrementally(mini-batches). + * + * <p>For the first mini-batch, it initialize and set up the next file path to write, + * but does not close the file writer until all the mini-batches write finish. Each mini-batch + * data are appended to the same file. + * + * @param <T> Payload type + * @param <I> Input type + * @param <K> Key type + * @param <O> Output type + */ +public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O> + extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle { + + private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class); + private long lastFileSize = 0L; + + public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { + this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config), + taskContextSupplier); + } + + public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + String partitionPath, String fileId, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, partitionPath, fileId, writerSchemaIncludingAndExcludingMetadataPair, + taskContextSupplier); + } + + /** + * Called by the compactor code path. + */ + public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, taskContextSupplier); + } + + /** + * Get the incremental write status. In mini-batch write mode, + * this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches), + * thus, after a mini-batch append finish, we do not close the underneath writer but return + * the incremental WriteStatus instead. + * + * @return the incremental write status + */ + private WriteStatus incWriteStatus() { Review comment: Let's rename it to be `getIncrementalWriteStatus`? The prefix `inc` may associate with `increase` would make users confused. ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.HoodieRecordSizeEstimator; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieMergeHandle} that supports merge write incrementally(mini-batches). + * + * <p>For a new mini-batch, it initialize and set up the next file path to write, + * and closes the file path when the mini-batch write finish. When next mini-batch + * write starts, it rolls over to another new file. If all the mini-batches write finish + * for a checkpoint round, it renames the last new file path as the desired file name + * (name with the expected file ID). + * + * @param <T> Payload type + * @param <I> Input type + * @param <K> Key type + * @param <O> Output type + */ +public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O> + extends HoodieMergeHandle<T, I, K, O> + implements MiniBatchHandle { + + private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class); + + /** + * Records the current file handles number that rolls over. + */ + private int rollNumber = 0; + /** + * Records the rolled over file paths. + */ + private List<Path> rolloverPaths; + /** + * Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet. + */ + private boolean needBootStrap = true; + + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier); + rolloverPaths = new ArrayList<>(); + } + + /** + * Called by compactor code path. + */ + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier); + } + + /** + * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + */ + protected String dataFileName() { + return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + rollNumber, hoodieTable.getBaseFileExtension()); + } + + public boolean isNeedBootStrap() { + return needBootStrap; + } + + @Override + public List<WriteStatus> close() { + List<WriteStatus> writeStatus = super.close(); + this.needBootStrap = false; + return writeStatus; + } + + /** + * THe difference with the parent method is that there is no need to set up + * locations for the records. + * + * @param fileId The file ID + * @param newRecordsItr The incremental records iterator + */ + @Override + protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) { + try { + // Load the new records in a map + long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps()); + LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); + this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), + new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema)); + } catch (IOException io) { + throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); + } + while (newRecordsItr.hasNext()) { + HoodieRecord<T> record = newRecordsItr.next(); + // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist + keyToNewRecords.put(record.getRecordKey(), record); + } + LOG.info("Number of entries in MemoryBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() + + "Total size in bytes of MemoryBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); + } + + /** + * + * Rolls over the write handle to prepare for the next batch write. + * + * <p>It tweaks the handle state as following: + * + * <ul> + * <li>Increment the {@code rollNumber}</li> + * <li>Book-keep the last file path, these files (except the last one) are temporary that need to be cleaned</li> + * <li>Make the last new file path as old</li> + * <li>Initialize the new file path and file writer</li> + * </ul> + * + * @param newRecordsItr The records iterator to update + */ + public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) { + init(this.fileId, newRecordsItr); + this.recordsWritten = 0; + this.recordsDeleted = 0; + this.updatedRecordsWritten = 0; + this.insertRecordsWritten = 0; + this.writeStatus.setTotalErrorRecords(0); + this.timer = new HoodieTimer().startTimer(); + + rollNumber++; + + rolloverPaths.add(newFilePath); + oldFilePath = newFilePath; + // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + String newFileName = dataFileName(); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + newFileName).toString(); + newFilePath = new Path(config.getBasePath(), relativePath); + + try { + fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier); + } catch (IOException e) { + throw new HoodieIOException("Error when creating file writer for path " + newFilePath, e); + } + + LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(), + newFilePath.toString())); + } + + public void finishWrite() { + for (int i = 0; i < rolloverPaths.size() - 1; i++) { + Path path = rolloverPaths.get(i); + try { + fs.delete(path, false); + } catch (IOException e) { + throw new HoodieIOException("Error when clean the temporary roll file: " + path, e); + } + } + Path lastPath = rolloverPaths.size() > 0 + ? rolloverPaths.get(rolloverPaths.size() - 1) + : newFilePath; + String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + newFileName).toString(); + final Path desiredPath = new Path(config.getBasePath(), relativePath); + try { + fs.rename(lastPath, desiredPath); Review comment: You may try to create a new file instead of renaming the current file? Refer to `TimelineLayoutV1`? ########## File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java ########## @@ -293,4 +295,9 @@ public static void initTableIfNotExists(Configuration conf) throws IOException { public static String generateBucketKey(String partitionPath, String fileId) { return String.format("%s_%s", partitionPath, fileId); } + + /** Returns whether the location represents an insert. */ + public static boolean isInsert(HoodieRecordLocation loc) { + return Objects.equals(loc.getInstantTime(), "I"); Review comment: Is `String#equals()` not suitable? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
