This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4dd0a2f46e4c08199b61e4555a06d5a45ad89b6f Author: Sagar Sumit <[email protected]> AuthorDate: Sun Aug 7 08:39:46 2022 +0530 [HUDI-4517] If no marker type file, fallback to timeline based marker (#6266) - If MARKERS.type file is not present, the logic assumes that the direct markers are stored, which causes the read failure in certain cases even where timeline server based marker is enabled. This PR handles the failure by falling back to timeline based marker in such cases. --- .../table/marker/MarkerBasedRollbackUtils.java | 40 ++++++++++++++++------ .../TestMarkerBasedRollbackStrategy.java | 22 ++++++++++++ .../org/apache/hudi/common/util/MarkerUtils.java | 3 +- .../service/handlers/marker/MarkerDirState.java | 2 +- 4 files changed, 55 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java index 9d1f37abdb..4d2f9e4e80 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java @@ -21,12 +21,13 @@ package org.apache.hudi.table.marker; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.marker.MarkerType; -import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayList; @@ -36,10 +37,19 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.marker.MarkerType.DIRECT; +import static org.apache.hudi.common.table.marker.MarkerType.TIMELINE_SERVER_BASED; +import static org.apache.hudi.common.util.MarkerUtils.MARKER_TYPE_FILENAME; +import static org.apache.hudi.common.util.MarkerUtils.readMarkerType; +import static org.apache.hudi.common.util.MarkerUtils.readTimelineServerBasedMarkersFromFileSystem; + /** * A utility class for marker-based rollback. */ public class MarkerBasedRollbackUtils { + + private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackUtils.class); + /** * Gets all marker paths. * @@ -54,25 +64,35 @@ public class MarkerBasedRollbackUtils { String instant, int parallelism) throws IOException { String markerDir = table.getMetaClient().getMarkerFolderPath(instant); FileSystem fileSystem = table.getMetaClient().getFs(); - Option<MarkerType> markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir); + Option<MarkerType> markerTypeOption = readMarkerType(fileSystem, markerDir); - // If there is no marker type file "MARKERS.type", we assume "DIRECT" markers are used + // If there is no marker type file "MARKERS.type", first assume "DIRECT" markers are used. + // If not, then fallback to "TIMELINE_SERVER_BASED" markers. if (!markerTypeOption.isPresent()) { - WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, table, instant); - return new ArrayList<>(writeMarkers.allMarkerFilePaths()); + WriteMarkers writeMarkers = WriteMarkersFactory.get(DIRECT, table, instant); + try { + return new ArrayList<>(writeMarkers.allMarkerFilePaths()); + } catch (IOException | IllegalArgumentException e) { + LOG.warn(String.format("%s not present and %s marker failed with error: %s. So, falling back to %s marker", + MARKER_TYPE_FILENAME, DIRECT, e.getMessage(), TIMELINE_SERVER_BASED)); + return getTimelineServerBasedMarkers(context, parallelism, markerDir, fileSystem); + } } switch (markerTypeOption.get()) { case TIMELINE_SERVER_BASED: // Reads all markers written by the timeline server - Map<String, Set<String>> markersMap = - MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( - markerDir, fileSystem, context, parallelism); - return markersMap.values().stream().flatMap(Collection::stream) - .collect(Collectors.toCollection(ArrayList::new)); + return getTimelineServerBasedMarkers(context, parallelism, markerDir, fileSystem); default: throw new HoodieException( "The marker type \"" + markerTypeOption.get().name() + "\" is not supported."); } } + + private static List<String> getTimelineServerBasedMarkers(HoodieEngineContext context, int parallelism, String markerDir, FileSystem fileSystem) { + Map<String, Set<String>> markersMap = readTimelineServerBasedMarkersFromFileSystem(markerDir, fileSystem, context, parallelism); + return markersMap.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java index 6762370952..927f8f3c24 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java @@ -36,6 +36,7 @@ import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.BaseRollbackHelper; import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy; +import org.apache.hudi.table.marker.DirectWriteMarkers; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hadoop.fs.FileStatus; @@ -48,12 +49,16 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; @Tag("functional") public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { @@ -204,4 +209,21 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { rollbackRequests); } + @Test + public void testMarkerBasedRollbackFallbackToTimelineServerWhenDirectMarkerFails() throws Exception { + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String f0 = testTable.addRequestedCommit("000") + .getFileIdsWithBaseFilesInPartitions("partA").get("partA"); + testTable.forCommit("001") + .withMarkerFile("partA", f0, IOType.APPEND); + + HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient); + + DirectWriteMarkers writeMarkers = mock(DirectWriteMarkers.class); + initMocks(this); + when(writeMarkers.allMarkerFilePaths()).thenThrow(new IOException("Markers.type file not present")); + MarkerBasedRollbackStrategy rollbackStrategy = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(), "002"); + List<HoodieRollbackRequest> rollbackRequests = rollbackStrategy.getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001")); + assertEquals(1, rollbackRequests.size()); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java index 555a036b9f..0aff8f594a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java @@ -64,7 +64,8 @@ public class MarkerUtils { * @return marker file name */ public static String stripMarkerFolderPrefix(String fullMarkerPath, String basePath, String instantTime) { - ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN)); + ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN), + String.format("Using DIRECT markers but marker path does not contain extension: %s", HoodieTableMetaClient.MARKER_EXTN)); String markerRootPath = Path.getPathWithoutSchemeAndAuthority( new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString(); return stripMarkerFolderPrefix(fullMarkerPath, markerRootPath); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 8ba9abf0eb..67e850bb7d 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -273,7 +273,7 @@ public class MarkerDirState implements Serializable { private void writeMarkerTypeToFile() { Path dirPath = new Path(markerDirPath); try { - if (!fileSystem.exists(dirPath)) { + if (!fileSystem.exists(dirPath) || !MarkerUtils.doesMarkerTypeFileExist(fileSystem, markerDirPath)) { // There is no existing marker directory, create a new directory and write marker type fileSystem.mkdirs(dirPath); MarkerUtils.writeMarkerTypeToFile(MarkerType.TIMELINE_SERVER_BASED, fileSystem, markerDirPath);
