This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 95d7489bc5 [HUDI-4517] If no marker type file, fallback to timeline
based marker (#6266)
95d7489bc5 is described below
commit 95d7489bc5712077eceef831de76a1ba2460641e
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Aug 7 08:39:46 2022 +0530
[HUDI-4517] If no marker type file, fallback to timeline based marker
(#6266)
- If MARKERS.type file is not present, the logic assumes that the direct
markers are stored, which causes the read failure in certain cases even where
timeline server based marker is enabled. This PR handles the failure by falling
back to timeline based marker in such cases.
---
.../table/marker/MarkerBasedRollbackUtils.java | 40 ++++++++++++++++------
.../TestMarkerBasedRollbackStrategy.java | 22 ++++++++++++
.../org/apache/hudi/common/util/MarkerUtils.java | 3 +-
.../service/handlers/marker/MarkerDirState.java | 2 +-
4 files changed, 55 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java
index 9d1f37abdb..4d2f9e4e80 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java
@@ -21,12 +21,13 @@ package org.apache.hudi.table.marker;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.marker.MarkerType;
-import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
@@ -36,10 +37,19 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.table.marker.MarkerType.DIRECT;
+import static
org.apache.hudi.common.table.marker.MarkerType.TIMELINE_SERVER_BASED;
+import static org.apache.hudi.common.util.MarkerUtils.MARKER_TYPE_FILENAME;
+import static org.apache.hudi.common.util.MarkerUtils.readMarkerType;
+import static
org.apache.hudi.common.util.MarkerUtils.readTimelineServerBasedMarkersFromFileSystem;
+
/**
* A utility class for marker-based rollback.
*/
public class MarkerBasedRollbackUtils {
+
+ private static final Logger LOG =
LogManager.getLogger(MarkerBasedRollbackUtils.class);
+
/**
* Gets all marker paths.
*
@@ -54,25 +64,35 @@ public class MarkerBasedRollbackUtils {
String instant, int
parallelism) throws IOException {
String markerDir = table.getMetaClient().getMarkerFolderPath(instant);
FileSystem fileSystem = table.getMetaClient().getFs();
- Option<MarkerType> markerTypeOption =
MarkerUtils.readMarkerType(fileSystem, markerDir);
+ Option<MarkerType> markerTypeOption = readMarkerType(fileSystem,
markerDir);
- // If there is no marker type file "MARKERS.type", we assume "DIRECT"
markers are used
+ // If there is no marker type file "MARKERS.type", first assume "DIRECT"
markers are used.
+ // If not, then fallback to "TIMELINE_SERVER_BASED" markers.
if (!markerTypeOption.isPresent()) {
- WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT,
table, instant);
- return new ArrayList<>(writeMarkers.allMarkerFilePaths());
+ WriteMarkers writeMarkers = WriteMarkersFactory.get(DIRECT, table,
instant);
+ try {
+ return new ArrayList<>(writeMarkers.allMarkerFilePaths());
+ } catch (IOException | IllegalArgumentException e) {
+ LOG.warn(String.format("%s not present and %s marker failed with
error: %s. So, falling back to %s marker",
+ MARKER_TYPE_FILENAME, DIRECT, e.getMessage(),
TIMELINE_SERVER_BASED));
+ return getTimelineServerBasedMarkers(context, parallelism, markerDir,
fileSystem);
+ }
}
switch (markerTypeOption.get()) {
case TIMELINE_SERVER_BASED:
// Reads all markers written by the timeline server
- Map<String, Set<String>> markersMap =
- MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
- markerDir, fileSystem, context, parallelism);
- return markersMap.values().stream().flatMap(Collection::stream)
- .collect(Collectors.toCollection(ArrayList::new));
+ return getTimelineServerBasedMarkers(context, parallelism, markerDir,
fileSystem);
default:
throw new HoodieException(
"The marker type \"" + markerTypeOption.get().name() + "\" is not
supported.");
}
}
+
+ private static List<String>
getTimelineServerBasedMarkers(HoodieEngineContext context, int parallelism,
String markerDir, FileSystem fileSystem) {
+ Map<String, Set<String>> markersMap =
readTimelineServerBasedMarkersFromFileSystem(markerDir, fileSystem, context,
parallelism);
+ return markersMap.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 6762370952..927f8f3c24 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -36,6 +36,7 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
+import org.apache.hudi.table.marker.DirectWriteMarkers;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.FileStatus;
@@ -48,12 +49,16 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
@Tag("functional")
public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
@@ -204,4 +209,21 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
rollbackRequests);
}
+ @Test
+ public void
testMarkerBasedRollbackFallbackToTimelineServerWhenDirectMarkerFails() throws
Exception {
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String f0 = testTable.addRequestedCommit("000")
+ .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
+ testTable.forCommit("001")
+ .withMarkerFile("partA", f0, IOType.APPEND);
+
+ HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
+
+ DirectWriteMarkers writeMarkers = mock(DirectWriteMarkers.class);
+ initMocks(this);
+ when(writeMarkers.allMarkerFilePaths()).thenThrow(new
IOException("Markers.type file not present"));
+ MarkerBasedRollbackStrategy rollbackStrategy = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(), "002");
+ List<HoodieRollbackRequest> rollbackRequests =
rollbackStrategy.getRollbackRequests(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
"001"));
+ assertEquals(1, rollbackRequests.size());
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java
index 555a036b9f..0aff8f594a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java
@@ -64,7 +64,8 @@ public class MarkerUtils {
* @return marker file name
*/
public static String stripMarkerFolderPrefix(String fullMarkerPath, String
basePath, String instantTime) {
-
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
+
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN),
+ String.format("Using DIRECT markers but marker path does not contain
extension: %s", HoodieTableMetaClient.MARKER_EXTN));
String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
new Path(String.format("%s/%s/%s", basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
return stripMarkerFolderPrefix(fullMarkerPath, markerRootPath);
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
index 8ba9abf0eb..67e850bb7d 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
@@ -273,7 +273,7 @@ public class MarkerDirState implements Serializable {
private void writeMarkerTypeToFile() {
Path dirPath = new Path(markerDirPath);
try {
- if (!fileSystem.exists(dirPath)) {
+ if (!fileSystem.exists(dirPath) ||
!MarkerUtils.doesMarkerTypeFileExist(fileSystem, markerDirPath)) {
// There is no existing marker directory, create a new directory and
write marker type
fileSystem.mkdirs(dirPath);
MarkerUtils.writeMarkerTypeToFile(MarkerType.TIMELINE_SERVER_BASED,
fileSystem, markerDirPath);