[ 
https://issues.apache.org/jira/browse/HUDI-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397041#comment-17397041
 ] 

ASF GitHub Bot commented on HUDI-1138:
--------------------------------------

yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r686473475



##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.FileIOUtils.closeQuietly;
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * Stores the state of a marker directory.
+ *
+ * The operations inside this class is designed to be thread-safe.
+ */
+public class MarkerDirState implements Serializable {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final Logger LOG = LogManager.getLogger(MarkerDirState.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  // Marker directory
+  private final String markerDirPath;
+  private final FileSystem fileSystem;
+  private final Registry metricsRegistry;
+  // A cached copy of all markers in memory
+  private final Set<String> allMarkers = new HashSet<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder
+  // for efficient appending
+  // Mapping: {markerFileIndex -> markers}
+  private final Map<Integer, StringBuilder> fileMarkersMap = new HashMap<>();
+  // A list of use status of underlying files storing markers by a thread.
+  // {@code true} means the file is in use by a {@code 
BatchCreateMarkerRunnable}.
+  // Index of the list is used for the filename, i.e., "1" -> "MARKERS1"
+  private final List<Boolean> threadUseStatus;
+  // A list of pending futures from async marker creation requests
+  private final List<MarkerCreationCompletableFuture> markerCreationFutures = 
new ArrayList<>();
+  private final int parallelism;
+  private final Object lazyInitLock = new Object();
+  private final Object markerCreationProcessingLock = new Object();
+  private transient HoodieEngineContext hoodieEngineContext;
+  // Last underlying file index used, for finding the next file index
+  // in a round-robin fashion
+  private int lastFileIndexUsed = 0;
+  private boolean lazyInitComplete;
+
+  public MarkerDirState(String markerDirPath, int markerBatchNumThreads, 
FileSystem fileSystem,
+                        Registry metricsRegistry, HoodieEngineContext 
hoodieEngineContext, int parallelism) {
+    this.markerDirPath = markerDirPath;
+    this.fileSystem = fileSystem;
+    this.metricsRegistry = metricsRegistry;
+    this.hoodieEngineContext = hoodieEngineContext;
+    this.parallelism = parallelism;
+    this.threadUseStatus =
+        Stream.generate(() -> 
false).limit(markerBatchNumThreads).collect(Collectors.toList());
+    this.lazyInitComplete = false;
+  }
+
+  /**
+   * @return  {@code true} if the marker directory exists in the system.
+   */
+  public boolean exists() {
+    try {
+      return fileSystem.exists(new Path(markerDirPath));
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * @return all markers in the marker directory.
+   */
+  public Set<String> getAllMarkers() {
+    maybeSyncOnFirstRequest();
+    return allMarkers;
+  }
+
+  /**
+   * Adds a {@code MarkerCreationCompletableFuture} instance from a marker
+   * creation request to the queue.
+   *
+   * @param future  {@code MarkerCreationCompletableFuture} instance.
+   */
+  public void addMarkerCreationFuture(MarkerCreationCompletableFuture future) {
+    synchronized (markerCreationFutures) {
+      markerCreationFutures.add(future);
+    }
+  }
+
+  /**
+   * @return the next file index to use in a round-robin fashion,
+   * or -1 if no file is available.
+   */
+  public int getNextFileIndexToUse() {
+    int fileIndex = -1;
+    synchronized (threadUseStatus) {

Review comment:
       I made the synchronization blocks only use two locks: (1) 
`markerCreationFutures` for request queue-related operations, (2) 
`markerCreationProcessingLock` for other processing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Re-implement marker files via timeline server
> ---------------------------------------------
>
>                 Key: HUDI-1138
>                 URL: https://issues.apache.org/jira/browse/HUDI-1138
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Vinoth Chandar
>            Assignee: Ethan Guo
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Even as you can argue that RFC-15/consolidated metadata, removes the need for 
> deleting partial files written due to spark task failures/stage retries. It 
> will still leave extra files inside the table (and users will pay for it 
> every month) and we need the marker mechanism to be able to delete these 
> partial files. 
> Here we explore if we can improve the current marker file mechanism, that 
> creates one marker file per data file written, by 
> Delegating the createMarker() call to the driver/timeline server, and have it 
> create marker metadata into a single file handle, that is flushed for 
> durability guarantees
>  
> P.S: I was tempted to think Spark listener mechanism can help us deal with 
> failed tasks, but it has no guarantees. the writer job could die without 
> deleting a partial file. i.e it can improve things, but cant provide 
> guarantees 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to