[ 
https://issues.apache.org/jira/browse/HUDI-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380329#comment-17380329
 ] 

ASF GitHub Bot commented on HUDI-1138:
--------------------------------------

yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r669295889



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -210,6 +212,21 @@
       .defaultValue("1500")
       .withDocumentation("");
 
+  public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+      .key("hoodie.markers.io.mode")
+      .defaultValue(MarkerIOMode.TIMELINE_BASED.toString())

Review comment:
       Sg.  I changed it to "DIRECT" for now.

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  private static final long MARGIN_TIME_MS = 10L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  private final List<Boolean> isMarkerFileInUseList;
+  private final long batchIntervalMs;
+  private final int parallelism;
+  private volatile Object createMarkerRequestlockObject = new Object();
+  private long nextBatchProcessTimeMs = 0L;
+
+  public MarkerHandler(Configuration conf, FileSystem fileSystem, 
FileSystemViewManager viewManager, Registry metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads + 
" batchIntervalMs=" + batchIntervalMs + "ms");
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.isMarkerFileInUseList = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestlockObject) {
+      createMarkerFutures.add(future);
+      long currTimeMs = System.currentTimeMillis();
+      if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+        nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs - 
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          for (String subPathStr: markerDirSubPaths) {
+            fileSystem.delete(new Path(subPathStr), true);
+          }
+        }
+
+        boolean result = fileSystem.delete(markerDirPath, true);
+        LOG.info("Removing marker directory at " + markerDirPath);
+        return result;
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return false;
+  }
+
+  private Set<String> getAllMarkersFromFile(String markerDirPath) {
+    LOG.info("Get all markers from " + markerDirPath);
+    Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+    Set<String> markers = new HashSet<>();
+    try {
+      if (fileSystem.exists(markersFilePath)) {

Review comment:
       This method is not used in the current logic.  I'm going to fix this 
once the marker file read logic is integrated into the `MarkerHandler`.

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  private static final long MARGIN_TIME_MS = 10L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  private final List<Boolean> isMarkerFileInUseList;
+  private final long batchIntervalMs;
+  private final int parallelism;
+  private volatile Object createMarkerRequestlockObject = new Object();
+  private long nextBatchProcessTimeMs = 0L;
+
+  public MarkerHandler(Configuration conf, FileSystem fileSystem, 
FileSystemViewManager viewManager, Registry metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads + 
" batchIntervalMs=" + batchIntervalMs + "ms");
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.isMarkerFileInUseList = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestlockObject) {
+      createMarkerFutures.add(future);
+      long currTimeMs = System.currentTimeMillis();
+      if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+        nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs - 
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          for (String subPathStr: markerDirSubPaths) {
+            fileSystem.delete(new Path(subPathStr), true);
+          }
+        }
+
+        boolean result = fileSystem.delete(markerDirPath, true);
+        LOG.info("Removing marker directory at " + markerDirPath);
+        return result;
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return false;
+  }
+
+  private Set<String> getAllMarkersFromFile(String markerDirPath) {
+    LOG.info("Get all markers from " + markerDirPath);
+    Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+    Set<String> markers = new HashSet<>();
+    try {
+      if (fileSystem.exists(markersFilePath)) {
+        LOG.info("Marker file exists: " + markersFilePath.toString());
+        FSDataInputStream fsDataInputStream = fileSystem.open(markersFilePath);
+        BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+        markers = bufferedReader.lines().collect(Collectors.toSet());
+        bufferedReader.close();
+        fsDataInputStream.close();
+      } else {
+        LOG.info("Marker file not exist: " + markersFilePath.toString());
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read MARKERS file " + 
markerDirPath, e);
+    }
+    return markers;
+  }
+
+  private class CreateMarkerCompletableFuture extends 
CompletableFuture<String> {
+    private Context context;
+    private final String markerDirPath;
+    private final String markerName;
+    private boolean result;
+    private final long startTimeMs;
+
+    public CreateMarkerCompletableFuture(Context context, String 
markerDirPath, String markerName) {
+      super();
+      this.startTimeMs = System.currentTimeMillis();
+      this.context = context;
+      this.markerDirPath = markerDirPath;
+      this.markerName = markerName;
+      this.result = false;
+    }
+
+    public Context getContext() {
+      return context;
+    }
+
+    public String getMarkerDirPath() {
+      return markerDirPath;
+    }
+
+    public String getMarkerName() {
+      return markerName;
+    }
+
+    public boolean getResult() {
+      return result;
+    }
+
+    public void setResult(boolean result) {
+      LOG.info("*** Request queued for " + (System.currentTimeMillis() - 
startTimeMs) + " ms");
+      this.result = result;
+    }
+  }
+
+  private class BatchCreateMarkerRunnable implements Runnable {
+
+    @Override
+    public void run() {
+      LOG.info("Start processing create marker requests");
+      long startTimeMs = System.currentTimeMillis();
+      List<CreateMarkerCompletableFuture> futuresToRemove = new ArrayList<>();
+      Set<String> updatedMarkerDirPaths = new HashSet<>();
+      int markerFileIndex = -1;
+      synchronized (isMarkerFileInUseList) {
+        for (int i = 0; i < isMarkerFileInUseList.size(); i++) {
+          if (!isMarkerFileInUseList.get(i)) {
+            markerFileIndex = i;
+            isMarkerFileInUseList.set(i, true);
+            break;
+          }
+        }
+        if (markerFileIndex < 0) {
+          LOG.info("All marker files are busy, skip batch processing of create 
marker requests in " + (System.currentTimeMillis() - startTimeMs) + " ms");
+          return;
+        }
+      }
+
+      LOG.info("timeMs=" + System.currentTimeMillis() + " markerFileIndex=" + 
markerFileIndex);
+      synchronized (createMarkerRequestlockObject) {
+        LOG.info("Iterating through existing requests, size=" + 
createMarkerFutures.size());
+        for (CreateMarkerCompletableFuture future : createMarkerFutures) {
+          String markerDirPath = future.getMarkerDirPath();
+          String markerName = future.getMarkerName();
+          LOG.info("--- markerDirPath=" + markerDirPath + " markerName=" + 
markerName);
+          Set<String> allMarkers = 
allMarkersMap.computeIfAbsent(markerDirPath, k -> new HashSet<>());
+          StringBuilder stringBuilder = 
fileMarkersMap.computeIfAbsent(markerDirPath, k -> new HashMap<>())
+              .computeIfAbsent(markerFileIndex, k -> new StringBuilder(16384));

Review comment:
       This size, `16384`, is the initial allocated size of the 
`StringBuilder`.  Once it grows beyond the allocated size, internally the 
`StringBuilder` allocates new memory space with a larger size (2x) and copies 
the String content over.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -210,6 +212,21 @@
       .defaultValue("1500")
       .withDocumentation("");
 
+  public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+      .key("hoodie.markers.io.mode")
+      .defaultValue(MarkerIOMode.TIMELINE_BASED.toString())
+      .withDocumentation("");
+
+  public static final ConfigProperty<Integer> 
MARKERS_TIMELINE_BASED_BATCH_THREAD = ConfigProperty

Review comment:
       Done.

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
##########
@@ -392,6 +401,38 @@ private void registerFileSlicesAPI() {
     }, true));
   }
 
+  private void registerMarkerAPI() {
+    app.get(RemoteHoodieTableFileSystemView.ALL_MARKERS_URL, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_MARKERS", 1);
+      Set<String> markers = markerHandler.getAllMarkers(
+          
ctx.queryParam(RemoteHoodieTableFileSystemView.MARKER_DIR_PATH_PARAM, ""));
+      writeValueAsString(ctx, markers);
+    }, false));
+
+    app.get(RemoteHoodieTableFileSystemView.CREATE_AND_MERGE_MARKERS_URL, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("CREATE_AND_MERGE_MARKERS", 1);
+      Set<String> markers = markerHandler.getCreateAndMergeMarkers(
+          
ctx.queryParam(RemoteHoodieTableFileSystemView.MARKER_DIR_PATH_PARAM, ""));
+      writeValueAsString(ctx, markers);
+    }, false));
+
+    app.post(RemoteHoodieTableFileSystemView.CREATE_MARKER_URL, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("CREATE_MARKER", 1);
+      ctx.result(markerHandler.createMarker(
+          ctx,
+          
ctx.queryParam(RemoteHoodieTableFileSystemView.MARKER_DIR_PATH_PARAM, ""),
+          ctx.queryParam(RemoteHoodieTableFileSystemView.MARKER_NAME_PARAM, 
"")));
+      //writeValueAsString(ctx, success);

Review comment:
       This line should not be used by async requests, which is the case for 
marker creation requests.  I'll remove the line. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -210,6 +212,21 @@
       .defaultValue("1500")
       .withDocumentation("");
 
+  public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+      .key("hoodie.markers.io.mode")
+      .defaultValue(MarkerIOMode.TIMELINE_BASED.toString())

Review comment:
       I added "0.9.0" as the min version.

##########
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java
##########
@@ -47,8 +49,9 @@ public JavaMarkerBasedRollbackStrategy(HoodieTable<T, 
List<HoodieRecord<T>>, Lis
   @Override
   public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
     try {
-      MarkerFiles markerFiles = new MarkerFiles(table, 
instantToRollback.getTimestamp());
-      List<HoodieRollbackStat> rollbackStats = 
context.map(markerFiles.allMarkerFilePaths(), markerFilePath -> {
+      MarkerFiles directMarkerFiles =

Review comment:
       Yes.  Somehow I missed this before publishing the WIP PR.

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  private static final long MARGIN_TIME_MS = 10L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  private final List<Boolean> isMarkerFileInUseList;
+  private final long batchIntervalMs;
+  private final int parallelism;
+  private volatile Object createMarkerRequestlockObject = new Object();
+  private long nextBatchProcessTimeMs = 0L;
+
+  public MarkerHandler(Configuration conf, FileSystem fileSystem, 
FileSystemViewManager viewManager, Registry metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads + 
" batchIntervalMs=" + batchIntervalMs + "ms");
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.isMarkerFileInUseList = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestlockObject) {
+      createMarkerFutures.add(future);
+      long currTimeMs = System.currentTimeMillis();
+      if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+        nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs - 
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          for (String subPathStr: markerDirSubPaths) {
+            fileSystem.delete(new Path(subPathStr), true);
+          }
+        }
+
+        boolean result = fileSystem.delete(markerDirPath, true);

Review comment:
       I plan to implement the logic of concurrently deleting marker files / 
subpaths so that's why I'd like to get a list of subpaths first.  I'll add this 
after I address all the comments.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -54,6 +54,7 @@
 import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
 import 
org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.marker.MarkerFiles;

Review comment:
       This is necessary since I changed the package of the `MarkerFiles` 
class.  Otherwise, the code doesn't compile.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
##########
@@ -84,7 +88,10 @@ private FileSystemViewManager createViewManager() {
   }
 
   public void startServer() throws IOException {
-    server = new TimelineService(preferredPort, viewManager, 
hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
+    server = new TimelineService(preferredPort, viewManager,
+        FSUtils.getFs(basePath, hadoopConf.newCopy()), hadoopConf.newCopy(), 
numThreads, shouldCompressOutput, useAsync,

Review comment:
       I cannot use `HoodieWriteConfig` instance in the `TimelineService` class 
since `hudi-client-common` module, which has the `HoodieWriteConfig`, depends 
on `hudi-timeline-service` module with `TimelineService` class.  Otherwise, 
there will be a cyclic module dependency.  I instead use the 
`TimeService.Confg` instance to pass the configs.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##########
@@ -443,6 +452,51 @@ public boolean refresh() {
     }
   }
 
+  @Override
+  public Set<String> getAllMarkerFilePaths(String markerDirPath) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath);
+    try {
+      return executeRequest(ALL_MARKERS_URL, paramsMap, new 
TypeReference<Set<String>>() {}, RequestMethod.GET);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Re-implement marker files via timeline server
> ---------------------------------------------
>
>                 Key: HUDI-1138
>                 URL: https://issues.apache.org/jira/browse/HUDI-1138
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Vinoth Chandar
>            Assignee: Ethan Guo
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Even as you can argue that RFC-15/consolidated metadata, removes the need for 
> deleting partial files written due to spark task failures/stage retries. It 
> will still leave extra files inside the table (and users will pay for it 
> every month) and we need the marker mechanism to be able to delete these 
> partial files. 
> Here we explore if we can improve the current marker file mechanism, that 
> creates one marker file per data file written, by 
> Delegating the createMarker() call to the driver/timeline server, and have it 
> create marker metadata into a single file handle, that is flushed for 
> durability guarantees
>  
> P.S: I was tempted to think Spark listener mechanism can help us deal with 
> failed tasks, but it has no guarantees. the writer job could die without 
> deleting a partial file. i.e it can improve things, but cant provide 
> guarantees 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to