[ 
https://issues.apache.org/jira/browse/HUDI-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384172#comment-17384172
 ] 

ASF GitHub Bot commented on HUDI-1138:
--------------------------------------

yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r672869447



##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  private static final long MARGIN_TIME_MS = 10L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  private final List<Boolean> isMarkerFileInUseList;
+  private final long batchIntervalMs;
+  private final int parallelism;
+  private volatile Object createMarkerRequestlockObject = new Object();
+  private long nextBatchProcessTimeMs = 0L;
+
+  public MarkerHandler(Configuration conf, FileSystem fileSystem, 
FileSystemViewManager viewManager, Registry metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads + 
" batchIntervalMs=" + batchIntervalMs + "ms");
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.isMarkerFileInUseList = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestlockObject) {
+      createMarkerFutures.add(future);
+      long currTimeMs = System.currentTimeMillis();
+      if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+        nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs - 
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          for (String subPathStr: markerDirSubPaths) {
+            fileSystem.delete(new Path(subPathStr), true);
+          }
+        }
+
+        boolean result = fileSystem.delete(markerDirPath, true);

Review comment:
       I've updated the logic here.  Deletion of marker files in the directory 
is parallelized to be more efficient, and we need to get the sub-directories, 
i.e., marker file paths, first in the directory.

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  private static final long MARGIN_TIME_MS = 10L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  private final List<Boolean> isMarkerFileInUseList;
+  private final long batchIntervalMs;
+  private final int parallelism;
+  private volatile Object createMarkerRequestlockObject = new Object();
+  private long nextBatchProcessTimeMs = 0L;
+
+  public MarkerHandler(Configuration conf, FileSystem fileSystem, 
FileSystemViewManager viewManager, Registry metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads + 
" batchIntervalMs=" + batchIntervalMs + "ms");
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.isMarkerFileInUseList = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestlockObject) {
+      createMarkerFutures.add(future);
+      long currTimeMs = System.currentTimeMillis();
+      if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+        nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs - 
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          for (String subPathStr: markerDirSubPaths) {
+            fileSystem.delete(new Path(subPathStr), true);
+          }
+        }
+
+        boolean result = fileSystem.delete(markerDirPath, true);
+        LOG.info("Removing marker directory at " + markerDirPath);
+        return result;
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return false;
+  }
+
+  private Set<String> getAllMarkersFromFile(String markerDirPath) {
+    LOG.info("Get all markers from " + markerDirPath);
+    Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+    Set<String> markers = new HashSet<>();
+    try {
+      if (fileSystem.exists(markersFilePath)) {

Review comment:
       Fixed.

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  private static final long MARGIN_TIME_MS = 10L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  private final List<Boolean> isMarkerFileInUseList;
+  private final long batchIntervalMs;
+  private final int parallelism;
+  private volatile Object createMarkerRequestlockObject = new Object();
+  private long nextBatchProcessTimeMs = 0L;
+
+  public MarkerHandler(Configuration conf, FileSystem fileSystem, 
FileSystemViewManager viewManager, Registry metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads + 
" batchIntervalMs=" + batchIntervalMs + "ms");
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.isMarkerFileInUseList = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestlockObject) {
+      createMarkerFutures.add(future);
+      long currTimeMs = System.currentTimeMillis();
+      if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+        nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs - 
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          for (String subPathStr: markerDirSubPaths) {
+            fileSystem.delete(new Path(subPathStr), true);
+          }
+        }
+
+        boolean result = fileSystem.delete(markerDirPath, true);
+        LOG.info("Removing marker directory at " + markerDirPath);
+        return result;
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return false;
+  }
+
+  private Set<String> getAllMarkersFromFile(String markerDirPath) {
+    LOG.info("Get all markers from " + markerDirPath);
+    Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+    Set<String> markers = new HashSet<>();
+    try {
+      if (fileSystem.exists(markersFilePath)) {
+        LOG.info("Marker file exists: " + markersFilePath.toString());
+        FSDataInputStream fsDataInputStream = fileSystem.open(markersFilePath);
+        BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+        markers = bufferedReader.lines().collect(Collectors.toSet());
+        bufferedReader.close();
+        fsDataInputStream.close();
+      } else {
+        LOG.info("Marker file not exist: " + markersFilePath.toString());
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read MARKERS file " + 
markerDirPath, e);
+    }
+    return markers;
+  }
+
+  private class CreateMarkerCompletableFuture extends 
CompletableFuture<String> {
+    private Context context;
+    private final String markerDirPath;
+    private final String markerName;
+    private boolean result;
+    private final long startTimeMs;
+
+    public CreateMarkerCompletableFuture(Context context, String 
markerDirPath, String markerName) {
+      super();
+      this.startTimeMs = System.currentTimeMillis();
+      this.context = context;
+      this.markerDirPath = markerDirPath;
+      this.markerName = markerName;
+      this.result = false;
+    }
+
+    public Context getContext() {
+      return context;
+    }
+
+    public String getMarkerDirPath() {
+      return markerDirPath;
+    }
+
+    public String getMarkerName() {
+      return markerName;
+    }
+
+    public boolean getResult() {
+      return result;
+    }
+
+    public void setResult(boolean result) {
+      LOG.info("*** Request queued for " + (System.currentTimeMillis() - 
startTimeMs) + " ms");
+      this.result = result;
+    }
+  }
+
+  private class BatchCreateMarkerRunnable implements Runnable {
+
+    @Override
+    public void run() {
+      LOG.info("Start processing create marker requests");
+      long startTimeMs = System.currentTimeMillis();
+      List<CreateMarkerCompletableFuture> futuresToRemove = new ArrayList<>();
+      Set<String> updatedMarkerDirPaths = new HashSet<>();

Review comment:
       For a single commit, the `updatedMarkerDirPaths` Set should contain only 
one marker directory since all the creation requests have the same commit 
instant which is used to construct the marker directory for the commit.
   
   I still use a Set here for multi-commit use case where marker creation 
requests from different commit are received so the `MarkerHandler` need to 
differentiate the commits.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
##########
@@ -40,51 +41,49 @@
   private static final Logger LOG = 
LogManager.getLogger(EmbeddedTimelineService.class);
 
   private int serverPort;
-  private int preferredPort;
   private String hostAddr;
   private HoodieEngineContext context;
   private final SerializableConfiguration hadoopConf;
-  private final FileSystemViewStorageConfig config;
-  private final HoodieMetadataConfig metadataConfig;
+  private final HoodieWriteConfig writeConfig;
   private final String basePath;
 
-  private final int numThreads;
-  private final boolean shouldCompressOutput;
-  private final boolean useAsync;
   private transient FileSystemViewManager viewManager;
   private transient TimelineService server;
 
-  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
-                                 HoodieMetadataConfig metadataConfig, 
FileSystemViewStorageConfig config, String basePath,
-                                 int numThreads, boolean compressOutput, 
boolean useAsync) {
+  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
     setHostAddr(embeddedTimelineServiceHostAddr);
     this.context = context;
-    this.config = config;
-    this.basePath = basePath;
-    this.metadataConfig = metadataConfig;
+    this.basePath = writeConfig.getBasePath();
     this.hadoopConf = context.getHadoopConf();
     this.viewManager = createViewManager();
-    this.preferredPort = embeddedTimelineServerPort;
-    this.numThreads = numThreads;
-    this.shouldCompressOutput = compressOutput;
-    this.useAsync = useAsync;
+    this.writeConfig = writeConfig;
   }
 
   private FileSystemViewManager createViewManager() {
     // Using passed-in configs to build view storage configs
     FileSystemViewStorageConfig.Builder builder =
-        
FileSystemViewStorageConfig.newBuilder().fromProperties(config.getProps());
+        
FileSystemViewStorageConfig.newBuilder().fromProperties(writeConfig.getClientSpecifiedViewStorageConfig().getProps());
     FileSystemViewStorageType storageType = builder.build().getStorageType();
     if (storageType.equals(FileSystemViewStorageType.REMOTE_ONLY)
         || storageType.equals(FileSystemViewStorageType.REMOTE_FIRST)) {
       // Reset to default if set to Remote
       builder.withStorageType(FileSystemViewStorageType.MEMORY);
     }
-    return FileSystemViewManager.createViewManager(context, metadataConfig, 
builder.build(), basePath);
+    return FileSystemViewManager.createViewManager(context, 
writeConfig.getMetadataConfig(), builder.build(), basePath);
   }
 
   public void startServer() throws IOException {
-    server = new TimelineService(preferredPort, viewManager, 
hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
+    TimelineService.Config timelineServiceConf = new TimelineService.Config();
+    timelineServiceConf.serverPort = 
writeConfig.getEmbeddedTimelineServerPort();
+    timelineServiceConf.numThreads = 
writeConfig.getEmbeddedTimelineServerThreads();
+    timelineServiceConf.compress = 
writeConfig.getEmbeddedTimelineServerCompressOutput();;
+    timelineServiceConf.async = 
writeConfig.getEmbeddedTimelineServerUseAsync();
+    timelineServiceConf.markerBatchNumThreads = 
writeConfig.getMarkersTimelineBasedBatchNumThreads();

Review comment:
       Yes.  I added a `Builder` class for the `TimelineService.Config`.  
Member variable scope of TimelineService.Config, i.e., public, is not touched 
given it's used in parsing the CLI arguments.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectMarkerFiles.java
##########
@@ -175,70 +156,29 @@ public static String stripMarkerSuffix(String path) {
     return markerFiles;
   }
 
-  private String stripMarkerFolderPrefix(String fullMarkerPath) {
-    
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
-    String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
-        new Path(String.format("%s/%s/%s", basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
-    int begin = fullMarkerPath.indexOf(markerRootPath);
-    ValidationUtils.checkArgument(begin >= 0,
-        "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected 
Marker Root=" + markerRootPath);
-    return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
-  }
-
-  /**
-   * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
-   */
-  public Path create(String partitionPath, String dataFileName, IOType type) {
+  protected Path create(String partitionPath, String dataFileName, IOType 
type, boolean checkIfExists) {
+    LOG.info("^^^ [direct] Create marker file : " + partitionPath + " " + 
dataFileName);
+    long startTimeMs = System.currentTimeMillis();
     Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
+    Path dirPath = markerPath.getParent();
     try {
-      LOG.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
+      if (!fs.exists(dirPath)) {
+        fs.mkdirs(dirPath); // create a new partition as needed.
+      }
     } catch (IOException e) {
-      throw new HoodieException("Failed to create marker file " + markerPath, 
e);
+      throw new HoodieIOException("Failed to make dir " + dirPath, e);
     }
-    return markerPath;
-  }
-
-  /**
-   * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
-   *
-   * @return true if the marker file creates successfully,
-   * false if it already exists
-   */
-  public boolean createIfNotExists(String partitionPath, String dataFileName, 
IOType type) {
-    Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
     try {
-      if (fs.exists(markerPath)) {
+      if (checkIfExists && fs.exists(markerPath)) {
         LOG.warn("Marker Path=" + markerPath + " already exists, cancel 
creation");
-        return false;
+        return null;

Review comment:
       Resolved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Re-implement marker files via timeline server
> ---------------------------------------------
>
>                 Key: HUDI-1138
>                 URL: https://issues.apache.org/jira/browse/HUDI-1138
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Vinoth Chandar
>            Assignee: Ethan Guo
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Even as you can argue that RFC-15/consolidated metadata, removes the need for 
> deleting partial files written due to spark task failures/stage retries. It 
> will still leave extra files inside the table (and users will pay for it 
> every month) and we need the marker mechanism to be able to delete these 
> partial files. 
> Here we explore if we can improve the current marker file mechanism, that 
> creates one marker file per data file written, by 
> Delegating the createMarker() call to the driver/timeline server, and have it 
> create marker metadata into a single file handle, that is flushed for 
> durability guarantees
>  
> P.S: I was tempted to think Spark listener mechanism can help us deal with 
> failed tasks, but it has no guarantees. the writer job could die without 
> deleting a partial file. i.e it can improve things, but cant provide 
> guarantees 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to