danny0405 commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1238326624
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -901,6 +901,9 @@ private void startCommit(String instantTime, String
actionType, HoodieTableMetaC
metaClient.getActiveTimeline().createNewInstant(new
HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime));
}
+
+ // populate marker directory for the commit.
+ WriteMarkersFactory.get(config.getMarkersType(), createTable(config,
hadoopConf), instantTime).createMarkerDir();
}
Review Comment:
We can refactor the api to
```java
public static WriteMarkers get(MarkerType markerType, HoodieTableMetaClient
metaClient, String instantTime)
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -512,6 +512,7 @@ public List<WriteStatus> close() {
status.getStat().setFileSizeInBytes(logFileSize);
}
+ createCompletedMarkerFile(partitionPath, baseInstantTime);
Review Comment:
Can we create the file only if necessary, when `enforceFinalizeWriteCheck()`
or `enforceCompletionMarkerCheck()` is true.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
*
* @param partitionPath Partition path
*/
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType(), config, fileId,
hoodieTable.getMetaClient().getActiveTimeline());
+ protected void createInProgressMarkerFile(String partitionPath, String
dataFileName, String markerInstantTime) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ if (!writeMarkers.doesMarkerDirExist()) {
+ throw new HoodieIOException(String.format("Marker root directory absent
: %s/%s (%s)",
+ partitionPath, dataFileName, markerInstantTime));
+ }
+ if (config.enforceFinalizeWriteCheck()
+ && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("",
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
+ throw new HoodieCorruptedDataException("Reconciliation for instant " +
instantTime + " is completed, job is trying to re-write the data files.");
Review Comment:
Not sure what purpose of the check, seems the file with `FINALIZE_WRITE` was
never created in the code.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -135,18 +155,42 @@ private String translateMarkerToDataPath(String
markerPath) {
return stripMarkerSuffix(rPath);
}
+ public static String stripMarkerSuffix(String path) {
+ return path.substring(0,
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN));
+ }
+
+ public static String stripOldStyleMarkerSuffix(String path) {
+ // marker file was created by older version of Hudi, with
INPROGRESS_MARKER_EXTN (f1_w1_c1.marker).
+ // Rename to data file by replacing .marker with .parquet.
+ return String.format("%s%s", path.substring(0,
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)),
+ HoodieFileFormat.PARQUET.getFileExtension());
+ }
+
@Override
public Set<String> allMarkerFilePaths() throws IOException {
Set<String> markerFiles = new HashSet<>();
if (doesMarkerDirExist()) {
FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
-
markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(),
basePath, instantTime));
+ // Only the inprogres markerFiles are to be included here
+ if
(fileStatus.getPath().toString().contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN))
{
Review Comment:
why only include in-progress marker files.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -86,6 +89,17 @@ public TimelineServerBasedWriteMarkers(HoodieTable table,
String instantTime) {
this.timeoutSecs = timeoutSecs;
}
+ @Override
+ protected Path getMarkerPath(String partitionPath, String dataFileName,
IOType type) {
+ return new Path(partitionPath, getMarkerFileName(dataFileName, type));
+ }
Review Comment:
Why override the partition path as relative?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -612,6 +612,20 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("File Id Prefix provider class, that implements
`org.apache.hudi.fileid.FileIdPrefixProvider`");
+ public static final ConfigProperty<String> ENFORCE_COMPLETION_MARKER_CHECKS
= ConfigProperty
+ .key("hoodie.markers.enforce.completion.checks")
+ .defaultValue("false")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Prevents the creation of duplicate data files, when
multiple spark tasks are racing to "
+ + "create data files and a completed data file is already present");
+
+ public static final ConfigProperty<String> ENFORCE_FINALIZE_WRITE_CHECK =
ConfigProperty
+ .key("hoodie.markers.enforce.finalize.write.check")
+ .defaultValue("false")
+ .sinceVersion("0.10.0")
Review Comment:
FIx the sinceVersion
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -441,6 +441,9 @@ public List<WriteStatus> close() {
performMergeDataValidationCheck(writeStatus);
+ // createCompleteMarkerFile throws hoodieException, if marker directory
is not present.
+ createCompletedMarkerFile(partitionPath, this.instantTime);
Review Comment:
Ditto: create when necessary.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -119,7 +139,7 @@ public Set<String>
createdAndMergedDataPaths(HoodieEngineContext context, int pa
while (itr.hasNext()) {
FileStatus status = itr.next();
String pathStr = status.getPath().toString();
- if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) &&
!pathStr.endsWith(IOType.APPEND.name())) {
+ if (pathStr.contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)
&& !pathStr.endsWith(IOType.APPEND.name())) {
result.add(translateMarkerToDataPath(pathStr));
Review Comment:
Why we ignore the handling of completion marker files.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -217,6 +218,8 @@ public List<WriteStatus> close() {
setupWriteStatus();
+ // createCompleteMarkerFile throws hoodieException, if marker directory
is not present.
+ createCompletedMarkerFile(partitionPath, this.instantTime);
Review Comment:
Ditto: create when necessary.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -132,6 +153,25 @@ public Set<String> allMarkerFilePaths() {
}
}
+ @Override
+ public void createMarkerDir() throws HoodieIOException {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Map<String, String> paramsMap = new HashMap<>();
+ paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
Review Comment:
Do we need a HTTP request for creating the marker dir then, the server
already located on the driver, and we already create a marker dir when start a
new instant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]