yihua commented on a change in pull request #3233: URL: https://github.com/apache/hudi/pull/3233#discussion_r686474524
########## File path: hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java ########## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline.service.handlers.marker; + +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.metrics.Registry; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.util.FileIOUtils.closeQuietly; +import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult; + +/** + * Stores the state of a marker directory. + * + * The operations inside this class is designed to be thread-safe. + */ +public class MarkerDirState implements Serializable { + public static final String MARKERS_FILENAME_PREFIX = "MARKERS"; + private static final Logger LOG = LogManager.getLogger(MarkerDirState.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + // Marker directory + private final String markerDirPath; + private final FileSystem fileSystem; + private final Registry metricsRegistry; + // A cached copy of all markers in memory + private final Set<String> allMarkers = new HashSet<>(); + // A cached copy of marker entries in each marker file, stored in StringBuilder + // for efficient appending + // Mapping: {markerFileIndex -> markers} + private final Map<Integer, StringBuilder> fileMarkersMap = new HashMap<>(); + // A list of use status of underlying files storing markers by a thread. + // {@code true} means the file is in use by a {@code BatchCreateMarkerRunnable}. + // Index of the list is used for the filename, i.e., "1" -> "MARKERS1" + private final List<Boolean> threadUseStatus; + // A list of pending futures from async marker creation requests + private final List<MarkerCreationCompletableFuture> markerCreationFutures = new ArrayList<>(); + private final int parallelism; + private final Object lazyInitLock = new Object(); + private final Object markerCreationProcessingLock = new Object(); + private transient HoodieEngineContext hoodieEngineContext; + // Last underlying file index used, for finding the next file index + // in a round-robin fashion + private int lastFileIndexUsed = 0; + private boolean lazyInitComplete; + + public MarkerDirState(String markerDirPath, int markerBatchNumThreads, FileSystem fileSystem, + Registry metricsRegistry, HoodieEngineContext hoodieEngineContext, int parallelism) { + this.markerDirPath = markerDirPath; + this.fileSystem = fileSystem; + this.metricsRegistry = metricsRegistry; + this.hoodieEngineContext = hoodieEngineContext; + this.parallelism = parallelism; + this.threadUseStatus = + Stream.generate(() -> false).limit(markerBatchNumThreads).collect(Collectors.toList()); + this.lazyInitComplete = false; + } + + /** + * @return {@code true} if the marker directory exists in the system. + */ + public boolean exists() { + try { + return fileSystem.exists(new Path(markerDirPath)); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + + /** + * @return all markers in the marker directory. + */ + public Set<String> getAllMarkers() { + maybeSyncOnFirstRequest(); + return allMarkers; + } + + /** + * Adds a {@code MarkerCreationCompletableFuture} instance from a marker + * creation request to the queue. + * + * @param future {@code MarkerCreationCompletableFuture} instance. + */ + public void addMarkerCreationFuture(MarkerCreationCompletableFuture future) { + synchronized (markerCreationFutures) { + markerCreationFutures.add(future); + } + } + + /** + * @return the next file index to use in a round-robin fashion, + * or -1 if no file is available. + */ + public int getNextFileIndexToUse() { + int fileIndex = -1; + synchronized (threadUseStatus) { + int nextIndex = (lastFileIndexUsed + 1) % threadUseStatus.size(); + if (!threadUseStatus.get(nextIndex)) { + fileIndex = nextIndex; + threadUseStatus.set(nextIndex, true); + } else { + for (int i = 1; i < threadUseStatus.size(); i++) { + int index = (lastFileIndexUsed + 1 + i) % threadUseStatus.size(); + if (!threadUseStatus.get(index)) { + fileIndex = index; + threadUseStatus.set(index, true); + break; + } + } + } + if (fileIndex >= 0) { + lastFileIndexUsed = fileIndex; + } + } + return fileIndex; + } + + /** + * Marks the file as available to use again. + * + * @param fileIndex file index + */ + public void markFileAsAvailable(int fileIndex) { + synchronized (threadUseStatus) { + threadUseStatus.set(fileIndex, false); + } + } + + /** + * @return futures of pending marker creation requests and removes them from the list. + */ + public List<MarkerCreationCompletableFuture> fetchPendingMarkerCreationRequests() { + if (markerCreationFutures.isEmpty()) { + return new ArrayList<>(); + } + maybeSyncOnFirstRequest(); + + List<MarkerCreationCompletableFuture> pendingFutures; + synchronized (markerCreationFutures) { + pendingFutures = new ArrayList<>(markerCreationFutures); + markerCreationFutures.clear(); + } + return pendingFutures; + } + + /** + * Processes pending marker creation requests. + * + * @param pendingMarkerCreationFutures futures of pending marker creation requests + * @param fileIndex file index to use to write markers + */ + public void processMarkerCreationRequests( + final List<MarkerCreationCompletableFuture> pendingMarkerCreationFutures, int fileIndex) { + if (pendingMarkerCreationFutures.isEmpty()) { + markFileAsAvailable(fileIndex); + return; + } + + LOG.debug("timeMs=" + System.currentTimeMillis() + " markerDirPath=" + markerDirPath + + " numRequests=" + pendingMarkerCreationFutures.size() + " fileIndex=" + fileIndex); + + synchronized (markerCreationProcessingLock) { + for (MarkerCreationCompletableFuture future : pendingMarkerCreationFutures) { + String markerName = future.getMarkerName(); + boolean exists = allMarkers.contains(markerName); + if (!exists) { + allMarkers.add(markerName); + StringBuilder stringBuilder = fileMarkersMap.computeIfAbsent(fileIndex, k -> new StringBuilder(16384)); + stringBuilder.append(markerName); + stringBuilder.append('\n'); + } + future.setResult(!exists); + } + } + flushMarkersToFile(fileIndex); Review comment: Also, since we time out the requests in the batch if S3 write IOException happens, the executors retry with new marker creations requests. In such a case, for rollback, the timeline server returns extra invalid markers which are not useful anyway, not affecting the functionality. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
