nbalajee commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1260374587
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -135,18 +155,42 @@ private String translateMarkerToDataPath(String
markerPath) {
return stripMarkerSuffix(rPath);
}
+ public static String stripMarkerSuffix(String path) {
+ return path.substring(0,
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN));
+ }
+
+ public static String stripOldStyleMarkerSuffix(String path) {
+ // marker file was created by older version of Hudi, with
INPROGRESS_MARKER_EXTN (f1_w1_c1.marker).
+ // Rename to data file by replacing .marker with .parquet.
+ return String.format("%s%s", path.substring(0,
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)),
+ HoodieFileFormat.PARQUET.getFileExtension());
+ }
+
@Override
public Set<String> allMarkerFilePaths() throws IOException {
Set<String> markerFiles = new HashSet<>();
if (doesMarkerDirExist()) {
FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
-
markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(),
basePath, instantTime));
+ // Only the inprogres markerFiles are to be included here
+ if
(fileStatus.getPath().toString().contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN))
{
Review Comment:
explained above.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -119,7 +139,7 @@ public Set<String>
createdAndMergedDataPaths(HoodieEngineContext context, int pa
while (itr.hasNext()) {
FileStatus status = itr.next();
String pathStr = status.getPath().toString();
- if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) &&
!pathStr.endsWith(IOType.APPEND.name())) {
+ if (pathStr.contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)
&& !pathStr.endsWith(IOType.APPEND.name())) {
result.add(translateMarkerToDataPath(pathStr));
Review Comment:
1. Every attempt to create a datafile will have a corresponding in-progress
marker file.
2. A successful attempt at creating a data file would create a completion
marker (if the flag is enabled)
3. If second/subsequent attempts are made to recreate the file and a
completion marker exists when trying to create the write handle, if the flag is
enabled, old write status is returned and the attempt is considered successful.
4. While an on-going write is in-progress (file write not completed yet), if
a second attempt is started, a new in-progress marker will be created for the
second/subsequent attempt. At the time of closing the writeHandle/file, the
first process would create the completion marker. Second/subsequent files
trying to close the writeHandle would cleanup the data file, upon seeing the
presence of a completion marker.
5. When reconciling data files, all in-progress markers are read and the
list is pruned by removing entries that have a writeStatus. Data files that
have been created but don't have a corresponding write status are candidates
for deletion during finalizeWrite.
During step 5, only in-progress markers are considered (completed markers
are not considered, as writeStatus is used as source of truth).
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
*
* @param partitionPath Partition path
*/
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType(), config, fileId,
hoodieTable.getMetaClient().getActiveTimeline());
+ protected void createInProgressMarkerFile(String partitionPath, String
dataFileName, String markerInstantTime) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ if (!writeMarkers.doesMarkerDirExist()) {
+ throw new HoodieIOException(String.format("Marker root directory absent
: %s/%s (%s)",
+ partitionPath, dataFileName, markerInstantTime));
+ }
+ if (config.enforceFinalizeWriteCheck()
+ && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("",
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
Review Comment:
> In general, instead of exactly match of the state, I prefer the final
consistency of the files diff we have on master.
+1 on performing final consistency of files based on the write statuses.
This is our preferred approach. However, this change is trying to address
extreme corner cases, that could result in extra files left on the dataset's
partition folder, that might appear as dupes.
For datasets where global indexes are enabled, it is difficult to
differentiate whether dupes are present due to extra left over data files
created by retries or due to a genuine issue with global index (like, hbase or
record level index). This change helps to plug the race windows resulting in
duplicate copies of data, multiple attempts on file creation due to
infrastructure related issues/instabilities.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]