nsivabalan commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1240377379
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
*
* @param partitionPath Partition path
*/
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType(), config, fileId,
hoodieTable.getMetaClient().getActiveTimeline());
+ protected void createInProgressMarkerFile(String partitionPath, String
dataFileName, String markerInstantTime) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ if (!writeMarkers.doesMarkerDirExist()) {
Review Comment:
may I know why this additional check. Was there any bug we encountered and
hence we are eagerly created the marker dir upfront ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
*
* @param partitionPath Partition path
*/
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType(), config, fileId,
hoodieTable.getMetaClient().getActiveTimeline());
+ protected void createInProgressMarkerFile(String partitionPath, String
dataFileName, String markerInstantTime) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ if (!writeMarkers.doesMarkerDirExist()) {
+ throw new HoodieIOException(String.format("Marker root directory absent
: %s/%s (%s)",
+ partitionPath, dataFileName, markerInstantTime));
+ }
+ if (config.enforceFinalizeWriteCheck()
+ && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("",
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
Review Comment:
can we create constants for "FINALIZE_WRITE"
also StringUtils.EMPTY_STRING for ""
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -612,6 +612,20 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("File Id Prefix provider class, that implements
`org.apache.hudi.fileid.FileIdPrefixProvider`");
+ public static final ConfigProperty<String> ENFORCE_COMPLETION_MARKER_CHECKS
= ConfigProperty
+ .key("hoodie.markers.enforce.completion.checks")
+ .defaultValue("false")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Prevents the creation of duplicate data files, when
multiple spark tasks are racing to "
+ + "create data files and a completed data file is already present");
+
+ public static final ConfigProperty<String> ENFORCE_FINALIZE_WRITE_CHECK =
ConfigProperty
+ .key("hoodie.markers.enforce.finalize.write.check")
+ .defaultValue("false")
+ .sinceVersion("0.10.0")
+ .withDocumentation("When WriteStatus obj is lost due to engine related
failures, then recomputing would involve "
+ + "re-writing all the data files. When this check is enabled it
would block the rewrite from happening.");
Review Comment:
docs is not very clear to me. can you enhance further please
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -612,6 +612,20 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("File Id Prefix provider class, that implements
`org.apache.hudi.fileid.FileIdPrefixProvider`");
+ public static final ConfigProperty<String> ENFORCE_COMPLETION_MARKER_CHECKS
= ConfigProperty
+ .key("hoodie.markers.enforce.completion.checks")
Review Comment:
hoodie.optimize.task.retries might be generic naming.
markers is just one way to achieving it.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -441,6 +441,9 @@ public List<WriteStatus> close() {
performMergeDataValidationCheck(writeStatus);
+ // createCompleteMarkerFile throws hoodieException, if marker directory
is not present.
+ createCompletedMarkerFile(partitionPath, this.instantTime);
Review Comment:
+1
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -512,6 +512,7 @@ public List<WriteStatus> close() {
status.getStat().setFileSizeInBytes(logFileSize);
}
+ createCompletedMarkerFile(partitionPath, baseInstantTime);
Review Comment:
yes. why are we creating completion markers by default. I was also expecting
it to be guarded the new configs added
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -119,7 +139,7 @@ public Set<String>
createdAndMergedDataPaths(HoodieEngineContext context, int pa
while (itr.hasNext()) {
FileStatus status = itr.next();
String pathStr = status.getPath().toString();
- if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) &&
!pathStr.endsWith(IOType.APPEND.name())) {
+ if (pathStr.contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)
&& !pathStr.endsWith(IOType.APPEND.name())) {
result.add(translateMarkerToDataPath(pathStr));
Review Comment:
actually we are interested only in inProgress here.
crux here is that.
if spark retries and in 2nd attempt, a data file was created, but never
completed (possible in hdfs), and so it may not have completion makers created.
from the reconcile standpoint, we need to get hold of all Inprogress files.
does not matter whether they are completed or not.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -151,6 +178,24 @@ public Schema getWriterSchema() {
return writeSchema;
}
+ protected void cleanupDataFile(Path dataFile) throws IOException {
Review Comment:
why protected? base WriteHandle class is the only class that accesses this
right ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -901,6 +901,9 @@ private void startCommit(String instantTime, String
actionType, HoodieTableMetaC
metaClient.getActiveTimeline().createNewInstant(new
HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime));
}
+
+ // populate marker directory for the commit.
+ WriteMarkersFactory.get(config.getMarkersType(), createTable(config,
hadoopConf), instantTime).createMarkerDir();
Review Comment:
may be we can move it within
```
public final HoodieTable initTable(WriteOperationType operationType,
Option<String> instantTime) {
```
in BaseHoodieWriteClient
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -135,18 +155,42 @@ private String translateMarkerToDataPath(String
markerPath) {
return stripMarkerSuffix(rPath);
}
+ public static String stripMarkerSuffix(String path) {
+ return path.substring(0,
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN));
+ }
+
+ public static String stripOldStyleMarkerSuffix(String path) {
+ // marker file was created by older version of Hudi, with
INPROGRESS_MARKER_EXTN (f1_w1_c1.marker).
+ // Rename to data file by replacing .marker with .parquet.
+ return String.format("%s%s", path.substring(0,
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)),
+ HoodieFileFormat.PARQUET.getFileExtension());
+ }
+
@Override
public Set<String> allMarkerFilePaths() throws IOException {
Set<String> markerFiles = new HashSet<>();
if (doesMarkerDirExist()) {
FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
-
markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(),
basePath, instantTime));
+ // Only the inprogres markerFiles are to be included here
+ if
(fileStatus.getPath().toString().contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN))
{
Review Comment:
same here.
we call allMarkerFilePaths() for rollback purpose. So, makes sense to fetch
only in progress markers
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
*
* @param partitionPath Partition path
*/
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType(), config, fileId,
hoodieTable.getMetaClient().getActiveTimeline());
+ protected void createInProgressMarkerFile(String partitionPath, String
dataFileName, String markerInstantTime) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ if (!writeMarkers.doesMarkerDirExist()) {
Review Comment:
in cloud stores, we should try and avoid direct fs calls if feasible.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
*
* @param partitionPath Partition path
*/
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType(), config, fileId,
hoodieTable.getMetaClient().getActiveTimeline());
+ protected void createInProgressMarkerFile(String partitionPath, String
dataFileName, String markerInstantTime) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ if (!writeMarkers.doesMarkerDirExist()) {
+ throw new HoodieIOException(String.format("Marker root directory absent
: %s/%s (%s)",
+ partitionPath, dataFileName, markerInstantTime));
+ }
+ if (config.enforceFinalizeWriteCheck()
+ && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("",
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
+ throw new HoodieCorruptedDataException("Reconciliation for instant " +
instantTime + " is completed, job is trying to re-write the data files.");
+ }
+ if (config.enforceCompletionMarkerCheck()
+ &&
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath,
fileId, markerInstantTime, getIOType()))) {
+ throw new HoodieIOException("Completed marker file exists for : " +
dataFileName + " (" + instantTime + ")");
+ }
+ writeMarkers.create(partitionPath, dataFileName, getIOType());
+ }
+
+ // visible for testing
+ public void createCompletedMarkerFile(String partition, String
markerInstantTime) throws IOException {
+ try {
+ WriteMarkersFactory.get(config.getMarkersType(), hoodieTable,
instantTime)
+ .createCompletionMarker(partition, fileId, markerInstantTime,
getIOType(), true);
+ } catch (Exception e) {
+ // Clean up the data file, if the marker is already present or marker
directories don't exist.
+ Path partitionPath =
FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partition);
Review Comment:
On which case we might hit this? can you help clarify please.
ie. marker dir is not present, but a write handle is still running and is
invoking code to create completion marker.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
*
* @param partitionPath Partition path
*/
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType(), config, fileId,
hoodieTable.getMetaClient().getActiveTimeline());
+ protected void createInProgressMarkerFile(String partitionPath, String
dataFileName, String markerInstantTime) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ if (!writeMarkers.doesMarkerDirExist()) {
+ throw new HoodieIOException(String.format("Marker root directory absent
: %s/%s (%s)",
+ partitionPath, dataFileName, markerInstantTime));
+ }
+ if (config.enforceFinalizeWriteCheck()
+ && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("",
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
Review Comment:
I feel we are kind of starting to use Markers as a state store.
so far, its used only for single data file creation marker.
And now we are starting to use it for overall write status to understand
whether FINALIZE_WRITE is completed or not.
Did we consider any other options here before narrowing down to this
approach.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -901,6 +901,9 @@ private void startCommit(String instantTime, String
actionType, HoodieTableMetaC
metaClient.getActiveTimeline().createNewInstant(new
HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime));
}
+
+ // populate marker directory for the commit.
+ WriteMarkersFactory.get(config.getMarkersType(), createTable(config,
hadoopConf), instantTime).createMarkerDir();
Review Comment:
whats the purpose of this. in this patch we are adding success markers
right. So, I don't see any reason to add this additional step. Also, we are
creating a new table (which might load the timeline) and which is never used
anywhere else. can we avoid this call if not really a necessary
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]