[ 
https://issues.apache.org/jira/browse/HUDI-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385116#comment-17385116
 ] 

ASF GitHub Bot commented on HUDI-1138:
--------------------------------------

nsivabalan commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r674311907



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineBasedMarkerFiles.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Marker file operations of using timeline service as a proxy to create and 
delete marker files.
+ * Each data file has a corresponding marker entry, which is stored in a 
limited number of
+ * marker files maintained by the timeline service (each marker file contains 
multiple marker
+ * entries).
+ */
+public class TimelineBasedMarkerFiles extends MarkerFiles {
+  private static final Logger LOG = 
LogManager.getLogger(TimelineBasedMarkerFiles.class);
+  private final RemoteHoodieTableFileSystemView remoteFSView;
+
+  public TimelineBasedMarkerFiles(String basePath, String markerFolderPath, 
String instantTime, RemoteHoodieTableFileSystemView remoteFSView) {
+    super(basePath, markerFolderPath, instantTime);
+    this.remoteFSView = remoteFSView;
+  }
+
+  public TimelineBasedMarkerFiles(HoodieTable table, String instantTime) {
+    this(table.getMetaClient().getBasePath(),
+        table.getMetaClient().getMarkerFolderPath(instantTime),
+        instantTime,
+        FileSystemViewManager.createRemoteFileSystemView(
+            table.getContext().getHadoopConf(),
+            table.getConfig().getViewStorageConfig(), table.getMetaClient())
+    );
+  }
+
+  @Override
+  public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) 
{
+    return remoteFSView.deleteMarkerDir(markerDirPath.toString());
+  }
+
+  @Override
+  public boolean doesMarkerDirExist() {
+    return remoteFSView.doesMarkerDirExist(markerDirPath.toString());
+  }
+
+  @Override
+  public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, 
int parallelism) throws IOException {
+    Set<String> markerPaths = 
remoteFSView.getCreateAndMergeMarkerFilePaths(markerDirPath.toString());
+    return 
markerPaths.stream().map(MarkerFiles::stripMarkerSuffix).collect(Collectors.toSet());
+  }
+
+  @Override
+  public Set<String> allMarkerFilePaths() {
+    return remoteFSView.getAllMarkerFilePaths(markerDirPath.toString());
+  }
+
+  @Override
+  protected Option<Path> create(String partitionPath, String dataFileName, 
IOType type, boolean checkIfExists) {
+    LOG.info("[timeline-based] Create marker file : " + partitionPath + " " + 
dataFileName);
+    long startTimeMs = System.currentTimeMillis();
+    String markerFileName = getMarkerFileName(dataFileName, type);
+    boolean success = remoteFSView.createMarker(markerDirPath.toString(), 
String.format("%s/%s", partitionPath, markerFileName));
+    LOG.info("[timeline-based] Created marker file in " + 
(System.currentTimeMillis() - startTimeMs) + " ms");

Review comment:
       I see we do info logging twice here for every marker creation. Is that 
intentioned? 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
##########
@@ -213,6 +213,6 @@ private void performRollbackAndValidate(boolean 
isUsingMarkers, HoodieWriteConfi
           String.format("%s:%s/%s", this.fs.getScheme(), basePath, 
rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
     }
 
-    assertFalse(new MarkerFiles(table, 
commitInstant.getTimestamp()).doesMarkerDirExist());
+    assertFalse(new DirectMarkerFiles(table, 
commitInstant.getTimestamp()).doesMarkerDirExist());

Review comment:
       may be we should parametrize this tests to run against both strategies. 

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
##########
@@ -68,30 +72,37 @@
   private final TimelineHandler instantHandler;
   private final FileSliceHandler sliceHandler;
   private final BaseFileHandler dataFileHandler;
+  private final MarkerHandler markerHandler;
   private Registry metricsRegistry = Registry.getRegistry("TimelineService");
   private ScheduledExecutorService asyncResultService = 
Executors.newSingleThreadScheduledExecutor();
   private final boolean useAsync;
 
-  public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager 
viewManager, boolean useAsync) throws IOException {
+  public RequestHandler(Javalin app, Configuration conf, HoodieEngineContext 
hoodieEngineContext,

Review comment:
       let's also add a follow up item to see if we can reduce the args here. 
does passing in TimelineService.Config here makes sense or some other ways. 

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *           |-----| batch interval
+ * Thread 1  |-------------------------->| writing to MARKERS1
+ * Thread 2        |-------------------------->| writing to MARKERS2
+ * Thread 3               |-------------------------->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code BatchCreateMarkerRunnable}
+  private final List<Boolean> markerFilesUseStatus;
+  // Batch process interval in milliseconds
+  private final long batchIntervalMs;
+  // Parallelism for reading and deleting marker files
+  private final int parallelism;
+  private transient HoodieEngineContext hoodieEngineContext;
+  // Lock for synchronous processing of marker creating requests
+  private volatile Object createMarkerRequestLockObject = new Object();
+  // Next batch process timestamp in milliseconds
+  private long nextBatchProcessTimeMs = 0L;
+  // Last marker file index used, for finding the next marker file index in a 
round-robin fashion
+  private int lastMarkerFileIndex = 0;
+
+  public MarkerHandler(Configuration conf, HoodieEngineContext 
hoodieEngineContext, FileSystem fileSystem,
+                       FileSystemViewManager viewManager, Registry 
metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("MarkerHandler Params: batchNumThreads=" + batchNumThreads + " 
batchIntervalMs=" + batchIntervalMs + "ms");
+    this.hoodieEngineContext = hoodieEngineContext;
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.markerFilesUseStatus = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  /**
+   * @param markerDirPath marker directory path
+   * @return all marker paths in the marker directory
+   */
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  /**
+   * @param markerDirPath marker directory path
+   * @return all marker paths of write IO type "CREATE" and "MERGE"
+   */
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * @param markerDir  marker directory path
+   * @return {@code true} if the marker directory exists; {@code false} 
otherwise.
+   */
+  public boolean doesMarkerDirExist(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      return fileSystem.exists(markerDirPath);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Generates a future for an async marker creation request
+   *
+   * The future is added to the marker creation future list and waits for the 
next batch processing
+   * of marker creation requests.
+   *
+   * @param context Javalin app context
+   * @param markerDirPath marker directory path
+   * @param markerName marker name
+   * @return the {@code CompletableFuture} instance for the request
+   */
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestLockObject) {
+      // Add the future to the list
+      createMarkerFutures.add(future);
+      // Update the next batch processing time and schedule the batch 
processing if necessary
+      long currTimeMs = System.currentTimeMillis();
+      // If the current request may miss the next batch processing, schedule a 
new batch processing thread
+      // A margin time is always considered for checking the tiemstamp to make 
sure no request is missed
+      if (currTimeMs >= nextBatchProcessTimeMs - SCHEDULING_MARGIN_TIME_MS) {
+        if (currTimeMs < nextBatchProcessTimeMs + batchIntervalMs - 
SCHEDULING_MARGIN_TIME_MS) {
+          // within the batch interval from the latest batch processing thread
+          // increment nextBatchProcessTimeMs by batchIntervalMs
+          nextBatchProcessTimeMs += batchIntervalMs;
+        } else {
+          // Otherwise, wait for batchIntervalMs based on the current timestamp
+          nextBatchProcessTimeMs = currTimeMs + batchIntervalMs;
+        }
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  /**
+   * Deletes markers in the directory.
+   *
+   * @param markerDir marker directory path
+   * @return {@code true} if successful; {@code false} otherwise.
+   */
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    boolean result = false;
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          SerializableConfiguration conf = new 
SerializableConfiguration(fileSystem.getConf());
+          int actualParallelism = Math.min(markerDirSubPaths.size(), 
parallelism);
+          hoodieEngineContext.foreach(markerDirSubPaths, subPathStr -> {
+            Path subPath = new Path(subPathStr);
+            FileSystem fileSystem = subPath.getFileSystem(conf.get());
+            fileSystem.delete(subPath, true);
+          }, actualParallelism);
+        }
+
+        result = fileSystem.delete(markerDirPath, true);
+        LOG.info("Removing marker directory at " + markerDirPath);
+      }
+      allMarkersMap.remove(markerDir);
+      fileMarkersMap.remove(markerDir);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return result;
+  }
+
+  /**
+   * Gets the marker file index from the marker file path.
+   *
+   * E.g., if the marker file path is /tmp/table/.hoodie/.temp/000/MARKERS3, 
the index returned is 3.
+   *
+   * @param markerFilePathStr full path of marker file
+   * @return the marker file index
+   */
+  private int getMarkerFileIndex(String markerFilePathStr) {
+    String markerFileName = new Path(markerFilePathStr).getName();
+    int prefixIndex = markerFileName.indexOf(MARKERS_FILENAME_PREFIX);
+    if (prefixIndex < 0) {
+      return -1;
+    }
+    try {
+      return Integer.parseInt(markerFileName.substring(prefixIndex + 
MARKERS_FILENAME_PREFIX.length()));
+    } catch (NumberFormatException nfe) {
+      nfe.printStackTrace();
+    }
+    return -1;
+  }
+
+  /**
+   * Syncs all markers maintained in the marker files from the marker 
directory in the file system.
+   *
+   * @param markerDir marker directory path
+   */
+  private void syncAllMarkersFromFile(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .filter(pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX))
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          SerializableConfiguration conf = new 
SerializableConfiguration(fileSystem.getConf());
+          int actualParallelism = Math.min(markerDirSubPaths.size(), 
parallelism);
+          Map<String, Set<String>> fileMarkersSetMap =
+              hoodieEngineContext.mapToPair(markerDirSubPaths, 
markersFilePathStr -> {
+                Path markersFilePath = new Path(markersFilePathStr);
+                FileSystem fileSystem = 
markersFilePath.getFileSystem(conf.get());
+                FSDataInputStream fsDataInputStream = null;
+                BufferedReader bufferedReader = null;
+                Set<String> markers = new HashSet<>();
+                try {
+                  LOG.info("Read marker file: " + markersFilePathStr);
+                  fsDataInputStream = fileSystem.open(markersFilePath);
+                  bufferedReader = new BufferedReader(new 
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+                  markers = bufferedReader.lines().collect(Collectors.toSet());
+                  bufferedReader.close();
+                  fsDataInputStream.close();
+                } catch (IOException e) {
+                  throw new HoodieIOException("Failed to read MARKERS file " + 
markerDirPath, e);
+                } finally {
+                  closeQuietly(bufferedReader);
+                  closeQuietly(fsDataInputStream);
+                }
+                return new ImmutablePair<>(markersFilePathStr, markers);
+              }, actualParallelism);
+
+          Set<String> allMarkers = new HashSet<>();
+          for (String markersFilePathStr: fileMarkersSetMap.keySet()) {
+            Set<String> fileMarkers = 
fileMarkersSetMap.get(markersFilePathStr);
+            if (!fileMarkers.isEmpty()) {
+              Map<Integer, StringBuilder> singleDirMarkersMap =
+                  fileMarkersMap.computeIfAbsent(markerDir, k -> new 
HashMap<>());
+              int index = getMarkerFileIndex(markersFilePathStr);
+
+              if (index >= 0) {
+                singleDirMarkersMap.put(index, new 
StringBuilder(StringUtils.join(",", fileMarkers)));
+                allMarkers.addAll(fileMarkers);
+              }
+            }
+          }
+          allMarkersMap.put(markerDir, allMarkers);
+        }
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Closes {@code Closeable} quietly.
+   *
+   * @param closeable {@code Closeable} to close
+   */
+  private void closeQuietly(Closeable closeable) {
+    if (closeable == null) {
+      return;
+    }
+    try {
+      closeable.close();
+    } catch (IOException e) {
+      LOG.warn("IOException during close", e);
+    }
+  }
+
+  /**
+   * Future for async marker creation request.
+   */
+  private class CreateMarkerCompletableFuture extends 
CompletableFuture<String> {
+    private final Context context;
+    private final String markerDirPath;
+    private final String markerName;
+    private boolean result;
+    private final long startTimeMs;
+
+    public CreateMarkerCompletableFuture(Context context, String 
markerDirPath, String markerName) {
+      super();
+      this.startTimeMs = System.currentTimeMillis();
+      this.context = context;
+      this.markerDirPath = markerDirPath;
+      this.markerName = markerName;
+      this.result = false;
+    }
+
+    public Context getContext() {
+      return context;
+    }
+
+    public String getMarkerDirPath() {
+      return markerDirPath;
+    }
+
+    public String getMarkerName() {
+      return markerName;
+    }
+
+    public boolean getResult() {
+      return result;
+    }
+
+    public void setResult(boolean result) {
+      LOG.info("Request queued for " + (System.currentTimeMillis() - 
startTimeMs) + " ms");

Review comment:
       Can we switch this to debug. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -423,7 +424,8 @@ protected void preWrite(String instantTime, 
WriteOperationType writeOperationTyp
   protected void postCommit(HoodieTable<T, I, K, O> table, 
HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> 
extraMetadata) {
     try {
       // Delete the marker directory for the instant.
-      new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context, 
config.getMarkersDeleteParallelism());
+      MarkerFilesFactory.get(config.getMarkersIOMode(), table, instantTime)

Review comment:
       something to remember.
   We might have to add an upgrade step for migration from previous version to 
new one. I guess it is as simple as just cleaning up the old marker files. But 
once we enable new format, for first time, we should remember to clean up older 
format marker files.

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *           |-----| batch interval
+ * Thread 1  |-------------------------->| writing to MARKERS1
+ * Thread 2        |-------------------------->| writing to MARKERS2
+ * Thread 3               |-------------------------->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code BatchCreateMarkerRunnable}
+  private final List<Boolean> markerFilesUseStatus;
+  // Batch process interval in milliseconds
+  private final long batchIntervalMs;
+  // Parallelism for reading and deleting marker files
+  private final int parallelism;
+  private transient HoodieEngineContext hoodieEngineContext;
+  // Lock for synchronous processing of marker creating requests
+  private volatile Object createMarkerRequestLockObject = new Object();
+  // Next batch process timestamp in milliseconds
+  private long nextBatchProcessTimeMs = 0L;
+  // Last marker file index used, for finding the next marker file index in a 
round-robin fashion
+  private int lastMarkerFileIndex = 0;
+
+  public MarkerHandler(Configuration conf, HoodieEngineContext 
hoodieEngineContext, FileSystem fileSystem,
+                       FileSystemViewManager viewManager, Registry 
metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("MarkerHandler Params: batchNumThreads=" + batchNumThreads + " 
batchIntervalMs=" + batchIntervalMs + "ms");
+    this.hoodieEngineContext = hoodieEngineContext;
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.markerFilesUseStatus = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  /**
+   * @param markerDirPath marker directory path
+   * @return all marker paths in the marker directory
+   */
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  /**
+   * @param markerDirPath marker directory path
+   * @return all marker paths of write IO type "CREATE" and "MERGE"
+   */
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * @param markerDir  marker directory path
+   * @return {@code true} if the marker directory exists; {@code false} 
otherwise.
+   */
+  public boolean doesMarkerDirExist(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      return fileSystem.exists(markerDirPath);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Generates a future for an async marker creation request
+   *
+   * The future is added to the marker creation future list and waits for the 
next batch processing
+   * of marker creation requests.
+   *
+   * @param context Javalin app context
+   * @param markerDirPath marker directory path
+   * @param markerName marker name
+   * @return the {@code CompletableFuture} instance for the request
+   */
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestLockObject) {
+      // Add the future to the list
+      createMarkerFutures.add(future);
+      // Update the next batch processing time and schedule the batch 
processing if necessary
+      long currTimeMs = System.currentTimeMillis();
+      // If the current request may miss the next batch processing, schedule a 
new batch processing thread
+      // A margin time is always considered for checking the tiemstamp to make 
sure no request is missed
+      if (currTimeMs >= nextBatchProcessTimeMs - SCHEDULING_MARGIN_TIME_MS) {
+        if (currTimeMs < nextBatchProcessTimeMs + batchIntervalMs - 
SCHEDULING_MARGIN_TIME_MS) {
+          // within the batch interval from the latest batch processing thread
+          // increment nextBatchProcessTimeMs by batchIntervalMs
+          nextBatchProcessTimeMs += batchIntervalMs;
+        } else {
+          // Otherwise, wait for batchIntervalMs based on the current timestamp
+          nextBatchProcessTimeMs = currTimeMs + batchIntervalMs;
+        }
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  /**
+   * Deletes markers in the directory.
+   *
+   * @param markerDir marker directory path
+   * @return {@code true} if successful; {@code false} otherwise.
+   */
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    boolean result = false;
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          SerializableConfiguration conf = new 
SerializableConfiguration(fileSystem.getConf());
+          int actualParallelism = Math.min(markerDirSubPaths.size(), 
parallelism);
+          hoodieEngineContext.foreach(markerDirSubPaths, subPathStr -> {
+            Path subPath = new Path(subPathStr);
+            FileSystem fileSystem = subPath.getFileSystem(conf.get());
+            fileSystem.delete(subPath, true);
+          }, actualParallelism);
+        }
+
+        result = fileSystem.delete(markerDirPath, true);
+        LOG.info("Removing marker directory at " + markerDirPath);
+      }
+      allMarkersMap.remove(markerDir);
+      fileMarkersMap.remove(markerDir);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return result;
+  }
+
+  /**
+   * Gets the marker file index from the marker file path.
+   *
+   * E.g., if the marker file path is /tmp/table/.hoodie/.temp/000/MARKERS3, 
the index returned is 3.
+   *
+   * @param markerFilePathStr full path of marker file
+   * @return the marker file index
+   */
+  private int getMarkerFileIndex(String markerFilePathStr) {
+    String markerFileName = new Path(markerFilePathStr).getName();
+    int prefixIndex = markerFileName.indexOf(MARKERS_FILENAME_PREFIX);
+    if (prefixIndex < 0) {
+      return -1;
+    }
+    try {
+      return Integer.parseInt(markerFileName.substring(prefixIndex + 
MARKERS_FILENAME_PREFIX.length()));
+    } catch (NumberFormatException nfe) {
+      nfe.printStackTrace();
+    }
+    return -1;
+  }
+
+  /**
+   * Syncs all markers maintained in the marker files from the marker 
directory in the file system.
+   *
+   * @param markerDir marker directory path
+   */
+  private void syncAllMarkersFromFile(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .filter(pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX))
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          SerializableConfiguration conf = new 
SerializableConfiguration(fileSystem.getConf());
+          int actualParallelism = Math.min(markerDirSubPaths.size(), 
parallelism);
+          Map<String, Set<String>> fileMarkersSetMap =
+              hoodieEngineContext.mapToPair(markerDirSubPaths, 
markersFilePathStr -> {
+                Path markersFilePath = new Path(markersFilePathStr);
+                FileSystem fileSystem = 
markersFilePath.getFileSystem(conf.get());
+                FSDataInputStream fsDataInputStream = null;
+                BufferedReader bufferedReader = null;
+                Set<String> markers = new HashSet<>();
+                try {
+                  LOG.info("Read marker file: " + markersFilePathStr);
+                  fsDataInputStream = fileSystem.open(markersFilePath);
+                  bufferedReader = new BufferedReader(new 
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+                  markers = bufferedReader.lines().collect(Collectors.toSet());
+                  bufferedReader.close();
+                  fsDataInputStream.close();
+                } catch (IOException e) {
+                  throw new HoodieIOException("Failed to read MARKERS file " + 
markerDirPath, e);
+                } finally {
+                  closeQuietly(bufferedReader);
+                  closeQuietly(fsDataInputStream);
+                }
+                return new ImmutablePair<>(markersFilePathStr, markers);
+              }, actualParallelism);
+
+          Set<String> allMarkers = new HashSet<>();
+          for (String markersFilePathStr: fileMarkersSetMap.keySet()) {
+            Set<String> fileMarkers = 
fileMarkersSetMap.get(markersFilePathStr);
+            if (!fileMarkers.isEmpty()) {
+              Map<Integer, StringBuilder> singleDirMarkersMap =
+                  fileMarkersMap.computeIfAbsent(markerDir, k -> new 
HashMap<>());
+              int index = getMarkerFileIndex(markersFilePathStr);
+
+              if (index >= 0) {
+                singleDirMarkersMap.put(index, new 
StringBuilder(StringUtils.join(",", fileMarkers)));
+                allMarkers.addAll(fileMarkers);
+              }
+            }
+          }
+          allMarkersMap.put(markerDir, allMarkers);
+        }
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Closes {@code Closeable} quietly.
+   *
+   * @param closeable {@code Closeable} to close
+   */
+  private void closeQuietly(Closeable closeable) {
+    if (closeable == null) {
+      return;
+    }
+    try {
+      closeable.close();
+    } catch (IOException e) {
+      LOG.warn("IOException during close", e);
+    }
+  }
+
+  /**
+   * Future for async marker creation request.
+   */
+  private class CreateMarkerCompletableFuture extends 
CompletableFuture<String> {
+    private final Context context;
+    private final String markerDirPath;
+    private final String markerName;
+    private boolean result;
+    private final long startTimeMs;
+
+    public CreateMarkerCompletableFuture(Context context, String 
markerDirPath, String markerName) {
+      super();
+      this.startTimeMs = System.currentTimeMillis();
+      this.context = context;
+      this.markerDirPath = markerDirPath;
+      this.markerName = markerName;
+      this.result = false;
+    }
+
+    public Context getContext() {
+      return context;
+    }
+
+    public String getMarkerDirPath() {
+      return markerDirPath;
+    }
+
+    public String getMarkerName() {
+      return markerName;
+    }
+
+    public boolean getResult() {
+      return result;
+    }
+
+    public void setResult(boolean result) {
+      LOG.info("Request queued for " + (System.currentTimeMillis() - 
startTimeMs) + " ms");
+      this.result = result;
+    }
+  }
+
+  /**
+   * A runnable for batch processing marker creation requests.
+   */
+  private class BatchCreateMarkerRunnable implements Runnable {
+
+    @Override
+    public void run() {
+      LOG.info("Start processing create marker requests");
+      long startTimeMs = System.currentTimeMillis();
+      List<CreateMarkerCompletableFuture> futuresToRemove = new ArrayList<>();
+      Set<String> updatedMarkerDirPaths = new HashSet<>();
+      int markerFileIndex = -1;
+      synchronized (markerFilesUseStatus) {
+        // Find the next marker file index to use in a round-robin fashion
+        for (int i = 0; i < markerFilesUseStatus.size(); i++) {
+          int index = (lastMarkerFileIndex + 1 + i) % 
markerFilesUseStatus.size();
+          if (!markerFilesUseStatus.get(index)) {
+            markerFileIndex = index;
+            markerFilesUseStatus.set(index, true);
+            break;
+          }
+        }
+        if (markerFileIndex < 0) {
+          LOG.info("All marker files are busy, skip batch processing of create 
marker requests in " + (System.currentTimeMillis() - startTimeMs) + " ms");
+          return;
+        }
+        lastMarkerFileIndex = markerFileIndex;
+      }
+
+      LOG.info("timeMs=" + System.currentTimeMillis() + " markerFileIndex=" + 
markerFileIndex);
+      synchronized (createMarkerRequestLockObject) {
+        LOG.info("Iterating through existing requests, size=" + 
createMarkerFutures.size());
+        for (CreateMarkerCompletableFuture future : createMarkerFutures) {
+          String markerDirPath = future.getMarkerDirPath();
+          String markerName = future.getMarkerName();
+          LOG.info("markerDirPath=" + markerDirPath + " markerName=" + 
markerName);
+          Set<String> allMarkers = 
allMarkersMap.computeIfAbsent(markerDirPath, k -> new HashSet<>());
+          boolean exists = allMarkers.contains(markerName);
+          if (!exists) {
+            allMarkers.add(markerName);
+            StringBuilder stringBuilder = 
fileMarkersMap.computeIfAbsent(markerDirPath, k -> new HashMap<>())
+                .computeIfAbsent(markerFileIndex, k -> new 
StringBuilder(16384));
+            stringBuilder.append(markerName);
+            stringBuilder.append('\n');
+            updatedMarkerDirPaths.add(markerDirPath);
+          }
+          future.setResult(!exists);
+          futuresToRemove.add(future);
+        }
+        createMarkerFutures.removeAll(futuresToRemove);
+      }
+      LOG.info("Flush to MARKERS file .. ");
+      flushMarkersToFile(updatedMarkerDirPaths, markerFileIndex);
+      markerFilesUseStatus.set(markerFileIndex, false);
+      LOG.info("Resolve request futures .. ");
+      for (CreateMarkerCompletableFuture future : futuresToRemove) {
+        try {
+          future.complete(jsonifyResult(future.getContext(), 
future.getResult(), metricsRegistry, OBJECT_MAPPER, LOG));
+        } catch (JsonProcessingException e) {
+          throw new HoodieException("Failed to JSON encode the value", e);
+        }
+      }
+      LOG.info("Finish batch processing of create marker requests in " + 
(System.currentTimeMillis() - startTimeMs) + " ms");
+    }
+
+    private void flushMarkersToFile(Set<String> updatedMarkerDirPaths, int 
markerFileIndex) {
+      long flushStartTimeMs = System.currentTimeMillis();
+      for (String markerDirPath : updatedMarkerDirPaths) {
+        LOG.info("Write to " + markerDirPath);
+        long startTimeMs = System.currentTimeMillis();
+        Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME_PREFIX 
+ markerFileIndex);
+        Path dirPath = markersFilePath.getParent();
+        try {
+          if (!fileSystem.exists(dirPath)) {
+            fileSystem.mkdirs(dirPath);
+          }
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to make dir " + dirPath, e);
+        }
+        FSDataOutputStream fsDataOutputStream = null;
+        BufferedWriter bufferedWriter = null;
+        try {
+          LOG.info("Create " + markersFilePath.toString());
+

Review comment:
       line break

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
##########
@@ -40,51 +41,50 @@
   private static final Logger LOG = 
LogManager.getLogger(EmbeddedTimelineService.class);
 
   private int serverPort;
-  private int preferredPort;
   private String hostAddr;
   private HoodieEngineContext context;
   private final SerializableConfiguration hadoopConf;
-  private final FileSystemViewStorageConfig config;
-  private final HoodieMetadataConfig metadataConfig;
+  private final HoodieWriteConfig writeConfig;
   private final String basePath;
 
-  private final int numThreads;
-  private final boolean shouldCompressOutput;
-  private final boolean useAsync;
   private transient FileSystemViewManager viewManager;
   private transient TimelineService server;
 
-  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
-                                 HoodieMetadataConfig metadataConfig, 
FileSystemViewStorageConfig config, String basePath,
-                                 int numThreads, boolean compressOutput, 
boolean useAsync) {
+  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
     setHostAddr(embeddedTimelineServiceHostAddr);
     this.context = context;
-    this.config = config;
-    this.basePath = basePath;
-    this.metadataConfig = metadataConfig;
+    this.writeConfig = writeConfig;
+    this.basePath = writeConfig.getBasePath();
     this.hadoopConf = context.getHadoopConf();
     this.viewManager = createViewManager();
-    this.preferredPort = embeddedTimelineServerPort;
-    this.numThreads = numThreads;
-    this.shouldCompressOutput = compressOutput;
-    this.useAsync = useAsync;
   }
 
   private FileSystemViewManager createViewManager() {
     // Using passed-in configs to build view storage configs
     FileSystemViewStorageConfig.Builder builder =
-        
FileSystemViewStorageConfig.newBuilder().fromProperties(config.getProps());
+        
FileSystemViewStorageConfig.newBuilder().fromProperties(writeConfig.getClientSpecifiedViewStorageConfig().getProps());
     FileSystemViewStorageType storageType = builder.build().getStorageType();
     if (storageType.equals(FileSystemViewStorageType.REMOTE_ONLY)
         || storageType.equals(FileSystemViewStorageType.REMOTE_FIRST)) {
       // Reset to default if set to Remote
       builder.withStorageType(FileSystemViewStorageType.MEMORY);
     }
-    return FileSystemViewManager.createViewManager(context, metadataConfig, 
builder.build(), basePath);
+    return FileSystemViewManager.createViewManager(context, 
writeConfig.getMetadataConfig(), builder.build(), basePath);
   }
 
   public void startServer() throws IOException {
-    server = new TimelineService(preferredPort, viewManager, 
hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
+    TimelineService.Config timelineServiceConf = 
TimelineService.Config.builder()
+        .serverPort(writeConfig.getEmbeddedTimelineServerPort())
+        .numThreads(writeConfig.getEmbeddedTimelineServerThreads())
+        .compress(writeConfig.getEmbeddedTimelineServerCompressOutput())
+        .async(writeConfig.getEmbeddedTimelineServerUseAsync())
+        
.markerBatchNumThreads(writeConfig.getMarkersTimelineBasedBatchNumThreads())

Review comment:
       do we instantiate these configs (getMarkersTimelineBasedBatchNumThreads, 
getMarkersTimelineBasedBatchIntervalMs, etc) even if timelline based marker 
strategy is disabled? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##########
@@ -177,6 +186,7 @@ public RemoteHoodieTableFileSystemView(String server, int 
port, HoodieTableMetaC
         break;
     }
     String content = response.returnContent().asString();
+    LOG.info("Got response in " + (System.currentTimeMillis() - startTimeMs) + 
" ms");

Review comment:
       again, do we need to info log the response time for every 
request/response

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##########
@@ -52,36 +53,50 @@
 
   private int serverPort;
   private Configuration conf;
+  private transient HoodieEngineContext context;
   private transient FileSystem fs;
   private transient Javalin app = null;
   private transient FileSystemViewManager fsViewsManager;
   private final int numThreads;
   private final boolean shouldCompressOutput;
   private final boolean useAsync;
+  private final int markerBatchNumThreads;
+  private final long markerBatchIntervalMs;
+  private final int markerDeleteParallelism;
 
   public int getServerPort() {
     return serverPort;
   }
 
-  public TimelineService(int serverPort, FileSystemViewManager 
globalFileSystemViewManager, Configuration conf,
-      int numThreads, boolean compressOutput, boolean useAsync) throws 
IOException {
+  private TimelineService(HoodieEngineContext context, int serverPort, 
FileSystem fs,

Review comment:
       same as above.

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##########
@@ -118,8 +133,118 @@ public TimelineService(Config config) throws IOException {
     @Parameter(names = {"--compress"}, description = "Compress output using 
gzip")
     public boolean compress = true;
 
+    @Parameter(names = {"--marker-batch-threads", "-mbt"}, description = 
"Number of threads to use for batch processing marker creation requests")
+    public int markerBatchNumThreads = 20;
+
+    @Parameter(names = {"--marker-batch-interval-ms", "-mbi"}, description = 
"The interval in milliseconds between two batch processing of marker creation 
requests")
+    public long markerBatchIntervalMs = 50;
+
+    @Parameter(names = {"--marker-parallelism", "-mdp"}, description = 
"Parallelism to use for reading and deleting marker files")
+    public int markerParallelism = 100;
+
     @Parameter(names = {"--help", "-h"})
     public Boolean help = false;
+
+    public static Builder builder() {
+      return new Builder();
+    }
+
+    /**
+     * Builder of Config class.
+     */
+    public static class Builder {
+      private Integer serverPort = 26754;
+      private FileSystemViewStorageType viewStorageType = 
FileSystemViewStorageType.SPILLABLE_DISK;
+      private Integer maxViewMemPerTableInMB = 2048;
+      private Double memFractionForCompactionPerTable = 0.001;
+      private String baseStorePathForFileGroups = 
FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue();
+      private String rocksDBPath = 
FileSystemViewStorageConfig.ROCKSDB_BASE_PATH_PROP.defaultValue();
+      private int numThreads = DEFAULT_NUM_THREADS;
+      private boolean async = false;
+      private boolean compress = true;
+      private int markerBatchNumThreads = 20;

Review comment:
       I see you have hard coded these at two places. line 91. can we declare 
constants and reuse. 

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *           |-----| batch interval
+ * Thread 1  |-------------------------->| writing to MARKERS1
+ * Thread 2        |-------------------------->| writing to MARKERS2
+ * Thread 3               |-------------------------->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code BatchCreateMarkerRunnable}
+  private final List<Boolean> markerFilesUseStatus;
+  // Batch process interval in milliseconds
+  private final long batchIntervalMs;
+  // Parallelism for reading and deleting marker files
+  private final int parallelism;
+  private transient HoodieEngineContext hoodieEngineContext;
+  // Lock for synchronous processing of marker creating requests
+  private volatile Object createMarkerRequestLockObject = new Object();
+  // Next batch process timestamp in milliseconds
+  private long nextBatchProcessTimeMs = 0L;
+  // Last marker file index used, for finding the next marker file index in a 
round-robin fashion
+  private int lastMarkerFileIndex = 0;
+
+  public MarkerHandler(Configuration conf, HoodieEngineContext 
hoodieEngineContext, FileSystem fileSystem,
+                       FileSystemViewManager viewManager, Registry 
metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("MarkerHandler Params: batchNumThreads=" + batchNumThreads + " 
batchIntervalMs=" + batchIntervalMs + "ms");
+    this.hoodieEngineContext = hoodieEngineContext;
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.markerFilesUseStatus = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  /**
+   * @param markerDirPath marker directory path
+   * @return all marker paths in the marker directory
+   */
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  /**
+   * @param markerDirPath marker directory path
+   * @return all marker paths of write IO type "CREATE" and "MERGE"
+   */
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * @param markerDir  marker directory path
+   * @return {@code true} if the marker directory exists; {@code false} 
otherwise.
+   */
+  public boolean doesMarkerDirExist(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      return fileSystem.exists(markerDirPath);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Generates a future for an async marker creation request
+   *
+   * The future is added to the marker creation future list and waits for the 
next batch processing
+   * of marker creation requests.
+   *
+   * @param context Javalin app context
+   * @param markerDirPath marker directory path
+   * @param markerName marker name
+   * @return the {@code CompletableFuture} instance for the request
+   */
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestLockObject) {
+      // Add the future to the list
+      createMarkerFutures.add(future);
+      // Update the next batch processing time and schedule the batch 
processing if necessary
+      long currTimeMs = System.currentTimeMillis();
+      // If the current request may miss the next batch processing, schedule a 
new batch processing thread
+      // A margin time is always considered for checking the tiemstamp to make 
sure no request is missed
+      if (currTimeMs >= nextBatchProcessTimeMs - SCHEDULING_MARGIN_TIME_MS) {
+        if (currTimeMs < nextBatchProcessTimeMs + batchIntervalMs - 
SCHEDULING_MARGIN_TIME_MS) {
+          // within the batch interval from the latest batch processing thread
+          // increment nextBatchProcessTimeMs by batchIntervalMs
+          nextBatchProcessTimeMs += batchIntervalMs;
+        } else {
+          // Otherwise, wait for batchIntervalMs based on the current timestamp
+          nextBatchProcessTimeMs = currTimeMs + batchIntervalMs;
+        }
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  /**
+   * Deletes markers in the directory.
+   *
+   * @param markerDir marker directory path
+   * @return {@code true} if successful; {@code false} otherwise.
+   */
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          SerializableConfiguration conf = new 
SerializableConfiguration(fileSystem.getConf());
+          int actualParallelism = Math.min(markerDirSubPaths.size(), 
parallelism);
+          hoodieEngineContext.foreach(markerDirSubPaths, subPathStr -> {
+            Path subPath = new Path(subPathStr);
+            FileSystem fileSystem = subPath.getFileSystem(conf.get());
+            fileSystem.delete(subPath, true);
+          }, actualParallelism);
+        }
+
+        boolean result = fileSystem.delete(markerDirPath, true);
+        LOG.info("Removing marker directory at " + markerDirPath);
+        return result;
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return false;
+  }
+
+  /**
+   * Gets the marker file index from the marker file path.
+   *
+   * E.g., if the marker file path is /tmp/table/.hoodie/.temp/000/MARKERS3, 
the index returned is 3.
+   *
+   * @param markerFilePathStr full path of marker file
+   * @return the marker file index
+   */
+  private int getMarkerFileIndex(String markerFilePathStr) {
+    String markerFileName = new Path(markerFilePathStr).getName();
+    int prefixIndex = markerFileName.indexOf(MARKERS_FILENAME_PREFIX);
+    if (prefixIndex < 0) {
+      return -1;
+    }
+    try {
+      return Integer.parseInt(markerFileName.substring(prefixIndex + 
MARKERS_FILENAME_PREFIX.length()));
+    } catch (NumberFormatException nfe) {
+      nfe.printStackTrace();
+    }
+    return -1;
+  }
+
+  /**
+   * Syncs all markers maintained in the marker files from the marker 
directory in the file system.
+   *
+   * @param markerDir marker directory path
+   */
+  private void syncAllMarkersFromFile(String markerDir) {

Review comment:
       yeah, I could not think of other ways. will see if vinoth has any 
suggestions

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerFiles.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Operates on marker files for a given write action (commit, delta commit, 
compaction).
+ *
+ * This abstract class provides abstract methods of different marker file 
operations, so that
+ * different marker file mechanism can be implemented.
+ */
+public abstract class MarkerFiles implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
+
+  protected final String basePath;
+  protected final transient Path markerDirPath;
+  protected final String instantTime;
+
+  public MarkerFiles(String basePath, String markerFolderPath, String 
instantTime) {
+    this.basePath = basePath;
+    this.markerDirPath = new Path(markerFolderPath);
+    this.instantTime = instantTime;
+  }
+
+  /**
+   * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.

Review comment:
       the format is applicable only for "Direct" mode right ? 

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *           |-----| batch interval
+ * Thread 1  |-------------------------->| writing to MARKERS1
+ * Thread 2        |-------------------------->| writing to MARKERS2
+ * Thread 3               |-------------------------->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code BatchCreateMarkerRunnable}
+  private final List<Boolean> markerFilesUseStatus;
+  // Batch process interval in milliseconds
+  private final long batchIntervalMs;
+  // Parallelism for reading and deleting marker files
+  private final int parallelism;
+  private transient HoodieEngineContext hoodieEngineContext;
+  // Lock for synchronous processing of marker creating requests
+  private volatile Object createMarkerRequestLockObject = new Object();
+  // Next batch process timestamp in milliseconds
+  private long nextBatchProcessTimeMs = 0L;
+  // Last marker file index used, for finding the next marker file index in a 
round-robin fashion
+  private int lastMarkerFileIndex = 0;
+
+  public MarkerHandler(Configuration conf, HoodieEngineContext 
hoodieEngineContext, FileSystem fileSystem,
+                       FileSystemViewManager viewManager, Registry 
metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("MarkerHandler Params: batchNumThreads=" + batchNumThreads + " 
batchIntervalMs=" + batchIntervalMs + "ms");
+    this.hoodieEngineContext = hoodieEngineContext;
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.markerFilesUseStatus = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  /**
+   * @param markerDirPath marker directory path
+   * @return all marker paths in the marker directory
+   */
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  /**
+   * @param markerDirPath marker directory path
+   * @return all marker paths of write IO type "CREATE" and "MERGE"
+   */
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * @param markerDir  marker directory path
+   * @return {@code true} if the marker directory exists; {@code false} 
otherwise.
+   */
+  public boolean doesMarkerDirExist(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      return fileSystem.exists(markerDirPath);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Generates a future for an async marker creation request
+   *
+   * The future is added to the marker creation future list and waits for the 
next batch processing
+   * of marker creation requests.
+   *
+   * @param context Javalin app context
+   * @param markerDirPath marker directory path
+   * @param markerName marker name
+   * @return the {@code CompletableFuture} instance for the request
+   */
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestLockObject) {
+      // Add the future to the list
+      createMarkerFutures.add(future);
+      // Update the next batch processing time and schedule the batch 
processing if necessary
+      long currTimeMs = System.currentTimeMillis();
+      // If the current request may miss the next batch processing, schedule a 
new batch processing thread
+      // A margin time is always considered for checking the tiemstamp to make 
sure no request is missed
+      if (currTimeMs >= nextBatchProcessTimeMs - SCHEDULING_MARGIN_TIME_MS) {
+        if (currTimeMs < nextBatchProcessTimeMs + batchIntervalMs - 
SCHEDULING_MARGIN_TIME_MS) {
+          // within the batch interval from the latest batch processing thread
+          // increment nextBatchProcessTimeMs by batchIntervalMs
+          nextBatchProcessTimeMs += batchIntervalMs;
+        } else {
+          // Otherwise, wait for batchIntervalMs based on the current timestamp
+          nextBatchProcessTimeMs = currTimeMs + batchIntervalMs;
+        }
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  /**
+   * Deletes markers in the directory.
+   *
+   * @param markerDir marker directory path
+   * @return {@code true} if successful; {@code false} otherwise.
+   */
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    boolean result = false;
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          SerializableConfiguration conf = new 
SerializableConfiguration(fileSystem.getConf());
+          int actualParallelism = Math.min(markerDirSubPaths.size(), 
parallelism);
+          hoodieEngineContext.foreach(markerDirSubPaths, subPathStr -> {
+            Path subPath = new Path(subPathStr);
+            FileSystem fileSystem = subPath.getFileSystem(conf.get());
+            fileSystem.delete(subPath, true);
+          }, actualParallelism);
+        }
+
+        result = fileSystem.delete(markerDirPath, true);
+        LOG.info("Removing marker directory at " + markerDirPath);
+      }
+      allMarkersMap.remove(markerDir);
+      fileMarkersMap.remove(markerDir);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return result;
+  }
+
+  /**
+   * Gets the marker file index from the marker file path.
+   *
+   * E.g., if the marker file path is /tmp/table/.hoodie/.temp/000/MARKERS3, 
the index returned is 3.
+   *
+   * @param markerFilePathStr full path of marker file
+   * @return the marker file index
+   */
+  private int getMarkerFileIndex(String markerFilePathStr) {
+    String markerFileName = new Path(markerFilePathStr).getName();
+    int prefixIndex = markerFileName.indexOf(MARKERS_FILENAME_PREFIX);
+    if (prefixIndex < 0) {
+      return -1;
+    }
+    try {
+      return Integer.parseInt(markerFileName.substring(prefixIndex + 
MARKERS_FILENAME_PREFIX.length()));
+    } catch (NumberFormatException nfe) {
+      nfe.printStackTrace();
+    }
+    return -1;
+  }
+
+  /**
+   * Syncs all markers maintained in the marker files from the marker 
directory in the file system.
+   *
+   * @param markerDir marker directory path
+   */
+  private void syncAllMarkersFromFile(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .filter(pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX))
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          SerializableConfiguration conf = new 
SerializableConfiguration(fileSystem.getConf());
+          int actualParallelism = Math.min(markerDirSubPaths.size(), 
parallelism);
+          Map<String, Set<String>> fileMarkersSetMap =
+              hoodieEngineContext.mapToPair(markerDirSubPaths, 
markersFilePathStr -> {
+                Path markersFilePath = new Path(markersFilePathStr);
+                FileSystem fileSystem = 
markersFilePath.getFileSystem(conf.get());
+                FSDataInputStream fsDataInputStream = null;
+                BufferedReader bufferedReader = null;
+                Set<String> markers = new HashSet<>();
+                try {
+                  LOG.info("Read marker file: " + markersFilePathStr);
+                  fsDataInputStream = fileSystem.open(markersFilePath);
+                  bufferedReader = new BufferedReader(new 
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+                  markers = bufferedReader.lines().collect(Collectors.toSet());
+                  bufferedReader.close();
+                  fsDataInputStream.close();
+                } catch (IOException e) {
+                  throw new HoodieIOException("Failed to read MARKERS file " + 
markerDirPath, e);
+                } finally {
+                  closeQuietly(bufferedReader);
+                  closeQuietly(fsDataInputStream);
+                }
+                return new ImmutablePair<>(markersFilePathStr, markers);
+              }, actualParallelism);
+
+          Set<String> allMarkers = new HashSet<>();
+          for (String markersFilePathStr: fileMarkersSetMap.keySet()) {
+            Set<String> fileMarkers = 
fileMarkersSetMap.get(markersFilePathStr);
+            if (!fileMarkers.isEmpty()) {
+              Map<Integer, StringBuilder> singleDirMarkersMap =
+                  fileMarkersMap.computeIfAbsent(markerDir, k -> new 
HashMap<>());
+              int index = getMarkerFileIndex(markersFilePathStr);
+
+              if (index >= 0) {
+                singleDirMarkersMap.put(index, new 
StringBuilder(StringUtils.join(",", fileMarkers)));
+                allMarkers.addAll(fileMarkers);
+              }
+            }
+          }
+          allMarkersMap.put(markerDir, allMarkers);
+        }
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Closes {@code Closeable} quietly.
+   *
+   * @param closeable {@code Closeable} to close
+   */
+  private void closeQuietly(Closeable closeable) {
+    if (closeable == null) {
+      return;
+    }
+    try {
+      closeable.close();
+    } catch (IOException e) {
+      LOG.warn("IOException during close", e);
+    }
+  }
+
+  /**
+   * Future for async marker creation request.
+   */
+  private class CreateMarkerCompletableFuture extends 
CompletableFuture<String> {
+    private final Context context;
+    private final String markerDirPath;
+    private final String markerName;
+    private boolean result;
+    private final long startTimeMs;
+
+    public CreateMarkerCompletableFuture(Context context, String 
markerDirPath, String markerName) {
+      super();
+      this.startTimeMs = System.currentTimeMillis();
+      this.context = context;
+      this.markerDirPath = markerDirPath;
+      this.markerName = markerName;
+      this.result = false;
+    }
+
+    public Context getContext() {
+      return context;
+    }
+
+    public String getMarkerDirPath() {
+      return markerDirPath;
+    }
+
+    public String getMarkerName() {
+      return markerName;
+    }
+
+    public boolean getResult() {
+      return result;
+    }
+
+    public void setResult(boolean result) {
+      LOG.info("Request queued for " + (System.currentTimeMillis() - 
startTimeMs) + " ms");
+      this.result = result;
+    }
+  }
+
+  /**
+   * A runnable for batch processing marker creation requests.
+   */
+  private class BatchCreateMarkerRunnable implements Runnable {
+
+    @Override
+    public void run() {
+      LOG.info("Start processing create marker requests");
+      long startTimeMs = System.currentTimeMillis();
+      List<CreateMarkerCompletableFuture> futuresToRemove = new ArrayList<>();
+      Set<String> updatedMarkerDirPaths = new HashSet<>();
+      int markerFileIndex = -1;
+      synchronized (markerFilesUseStatus) {
+        // Find the next marker file index to use in a round-robin fashion
+        for (int i = 0; i < markerFilesUseStatus.size(); i++) {
+          int index = (lastMarkerFileIndex + 1 + i) % 
markerFilesUseStatus.size();
+          if (!markerFilesUseStatus.get(index)) {
+            markerFileIndex = index;
+            markerFilesUseStatus.set(index, true);
+            break;
+          }
+        }
+        if (markerFileIndex < 0) {
+          LOG.info("All marker files are busy, skip batch processing of create 
marker requests in " + (System.currentTimeMillis() - startTimeMs) + " ms");
+          return;
+        }
+        lastMarkerFileIndex = markerFileIndex;
+      }
+
+      LOG.info("timeMs=" + System.currentTimeMillis() + " markerFileIndex=" + 
markerFileIndex);
+      synchronized (createMarkerRequestLockObject) {
+        LOG.info("Iterating through existing requests, size=" + 
createMarkerFutures.size());
+        for (CreateMarkerCompletableFuture future : createMarkerFutures) {
+          String markerDirPath = future.getMarkerDirPath();
+          String markerName = future.getMarkerName();
+          LOG.info("markerDirPath=" + markerDirPath + " markerName=" + 
markerName);
+          Set<String> allMarkers = 
allMarkersMap.computeIfAbsent(markerDirPath, k -> new HashSet<>());
+          boolean exists = allMarkers.contains(markerName);
+          if (!exists) {
+            allMarkers.add(markerName);
+            StringBuilder stringBuilder = 
fileMarkersMap.computeIfAbsent(markerDirPath, k -> new HashMap<>())
+                .computeIfAbsent(markerFileIndex, k -> new 
StringBuilder(16384));
+            stringBuilder.append(markerName);
+            stringBuilder.append('\n');
+            updatedMarkerDirPaths.add(markerDirPath);
+          }
+          future.setResult(!exists);
+          futuresToRemove.add(future);
+        }
+        createMarkerFutures.removeAll(futuresToRemove);
+      }
+      LOG.info("Flush to MARKERS file .. ");
+      flushMarkersToFile(updatedMarkerDirPaths, markerFileIndex);
+      markerFilesUseStatus.set(markerFileIndex, false);
+      LOG.info("Resolve request futures .. ");
+      for (CreateMarkerCompletableFuture future : futuresToRemove) {
+        try {
+          future.complete(jsonifyResult(future.getContext(), 
future.getResult(), metricsRegistry, OBJECT_MAPPER, LOG));
+        } catch (JsonProcessingException e) {
+          throw new HoodieException("Failed to JSON encode the value", e);
+        }
+      }
+      LOG.info("Finish batch processing of create marker requests in " + 
(System.currentTimeMillis() - startTimeMs) + " ms");
+    }
+
+    private void flushMarkersToFile(Set<String> updatedMarkerDirPaths, int 
markerFileIndex) {
+      long flushStartTimeMs = System.currentTimeMillis();
+      for (String markerDirPath : updatedMarkerDirPaths) {
+        LOG.info("Write to " + markerDirPath);
+        long startTimeMs = System.currentTimeMillis();
+        Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME_PREFIX 
+ markerFileIndex);
+        Path dirPath = markersFilePath.getParent();
+        try {
+          if (!fileSystem.exists(dirPath)) {
+            fileSystem.mkdirs(dirPath);
+          }
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to make dir " + dirPath, e);
+        }
+        FSDataOutputStream fsDataOutputStream = null;
+        BufferedWriter bufferedWriter = null;
+        try {
+          LOG.info("Create " + markersFilePath.toString());
+
+          fsDataOutputStream = fileSystem.create(markersFilePath);
+          bufferedWriter = new BufferedWriter(new 
OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
+          
bufferedWriter.write(fileMarkersMap.get(markerDirPath).get(markerFileIndex).toString());
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to overwrite marker file " + 
markersFilePath, e);
+        } finally {
+          closeQuietly(bufferedWriter);
+          closeQuietly(fsDataOutputStream);
+        }
+        LOG.info(markersFilePath.toString() + " written in " + 
(System.currentTimeMillis() - startTimeMs) + " ms");

Review comment:
       Can you revisit all logging and fix info, debug levels as appropriate.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Re-implement marker files via timeline server
> ---------------------------------------------
>
>                 Key: HUDI-1138
>                 URL: https://issues.apache.org/jira/browse/HUDI-1138
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Vinoth Chandar
>            Assignee: Ethan Guo
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Even as you can argue that RFC-15/consolidated metadata, removes the need for 
> deleting partial files written due to spark task failures/stage retries. It 
> will still leave extra files inside the table (and users will pay for it 
> every month) and we need the marker mechanism to be able to delete these 
> partial files. 
> Here we explore if we can improve the current marker file mechanism, that 
> creates one marker file per data file written, by 
> Delegating the createMarker() call to the driver/timeline server, and have it 
> create marker metadata into a single file handle, that is flushed for 
> durability guarantees
>  
> P.S: I was tempted to think Spark listener mechanism can help us deal with 
> failed tasks, but it has no guarantees. the writer job could die without 
> deleting a partial file. i.e it can improve things, but cant provide 
> guarantees 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to