nbalajee commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1259981790
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
*
* @param partitionPath Partition path
*/
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType(), config, fileId,
hoodieTable.getMetaClient().getActiveTimeline());
+ protected void createInProgressMarkerFile(String partitionPath, String
dataFileName, String markerInstantTime) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ if (!writeMarkers.doesMarkerDirExist()) {
+ throw new HoodieIOException(String.format("Marker root directory absent
: %s/%s (%s)",
+ partitionPath, dataFileName, markerInstantTime));
+ }
+ if (config.enforceFinalizeWriteCheck()
+ && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("",
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
+ throw new HoodieCorruptedDataException("Reconciliation for instant " +
instantTime + " is completed, job is trying to re-write the data files.");
+ }
+ if (config.enforceCompletionMarkerCheck()
+ &&
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath,
fileId, markerInstantTime, getIOType()))) {
+ throw new HoodieIOException("Completed marker file exists for : " +
dataFileName + " (" + instantTime + ")");
+ }
+ writeMarkers.create(partitionPath, dataFileName, getIOType());
+ }
+
+ // visible for testing
+ public void createCompletedMarkerFile(String partition, String
markerInstantTime) throws IOException {
+ try {
+ WriteMarkersFactory.get(config.getMarkersType(), hoodieTable,
instantTime)
+ .createCompletionMarker(partition, fileId, markerInstantTime,
getIOType(), true);
+ } catch (Exception e) {
+ // Clean up the data file, if the marker is already present or marker
directories don't exist.
+ Path partitionPath =
FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partition);
Review Comment:
> On which case we might hit this? can you help clarify please.
@nsivabalan - A task is running to create a data file. Execution engine
retried the operation on a different container/task (say stage attempt 1, due
to a stage failure), while the previous task is still running in the background
(not responsive to the driver).
If the second task (retry attempt) were to finish the data write and create
completion marker, at the time of closing the writeHandle the first Task (in
the example), would cleanup the fail after noticing the presence of the
completion marker.
If the closing of the writeHandle were to happen after
finalizeWrite/reconcileMarkers step, we would end up with a duplicate file, as
data files created both tries will be present (will not be removed by reconcile
step).
@danny0405 - Updated to HoodieException
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]