[ 
https://issues.apache.org/jira/browse/HUDI-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385847#comment-17385847
 ] 

ASF GitHub Bot commented on HUDI-1138:
--------------------------------------

vinothchandar commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r675261127



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectMarkerFiles.java
##########
@@ -175,70 +164,30 @@ public static String stripMarkerSuffix(String path) {
     return markerFiles;
   }
 
-  private String stripMarkerFolderPrefix(String fullMarkerPath) {
-    
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
-    String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
-        new Path(String.format("%s/%s/%s", basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
-    int begin = fullMarkerPath.indexOf(markerRootPath);
-    ValidationUtils.checkArgument(begin >= 0,
-        "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected 
Marker Root=" + markerRootPath);
-    return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
-  }
-
-  /**
-   * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
-   */
-  public Path create(String partitionPath, String dataFileName, IOType type) {
+  @Override
+  protected Option<Path> create(String partitionPath, String dataFileName, 
IOType type, boolean checkIfExists) {
+    LOG.info("[direct] Create marker file : " + partitionPath + " " + 
dataFileName);
+    long startTimeMs = System.currentTimeMillis();

Review comment:
       lets use `HoodieTimer`?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -222,6 +224,30 @@
           + "files from lake storage, before committing the write. Reduce this 
value, if the high number of tasks incur delays for smaller tables "
           + "or low latency writes.");
 
+  public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+      .key("hoodie.markers.io.mode")

Review comment:
       `hoodie.write.markers.type`? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
##########
@@ -67,10 +67,7 @@ private static EmbeddedTimelineService startTimelineService(
     LOG.info("Starting Timeline service !!");
     Option<String> hostAddr = 
context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
     EmbeddedTimelineService timelineService = new EmbeddedTimelineService(
-        context, hostAddr.orElse(null),config.getEmbeddedTimelineServerPort(),
-        config.getMetadataConfig(), 
config.getClientSpecifiedViewStorageConfig(), config.getBasePath(),
-        config.getEmbeddedTimelineServerThreads(), 
config.getEmbeddedTimelineServerCompressOutput(),
-        config.getEmbeddedTimelineServerUseAsync());
+        context, hostAddr.orElse(null), config);

Review comment:
       thank you! it was getting a bit crazy

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
##########
@@ -56,8 +58,8 @@ public SparkMarkerBasedRollbackStrategy(HoodieTable<T, 
JavaRDD<HoodieRecord<T>>,
   public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
     JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
     try {
-      MarkerFiles markerFiles = new MarkerFiles(table, 
instantToRollback.getTimestamp());
-      List<String> markerFilePaths = markerFiles.allMarkerFilePaths();
+      MarkerFiles markerFiles = 
MarkerFilesFactory.get(config.getMarkersIOMode(), table, 
instantToRollback.getTimestamp());
+      List<String> markerFilePaths = new 
ArrayList<>(markerFiles.allMarkerFilePaths());

Review comment:
       CollectionUtils.createImmutableList()? for these things?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectMarkerFiles.java
##########
@@ -175,70 +164,30 @@ public static String stripMarkerSuffix(String path) {
     return markerFiles;
   }
 
-  private String stripMarkerFolderPrefix(String fullMarkerPath) {
-    
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
-    String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
-        new Path(String.format("%s/%s/%s", basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
-    int begin = fullMarkerPath.indexOf(markerRootPath);
-    ValidationUtils.checkArgument(begin >= 0,
-        "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected 
Marker Root=" + markerRootPath);
-    return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
-  }
-
-  /**
-   * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
-   */
-  public Path create(String partitionPath, String dataFileName, IOType type) {
+  @Override
+  protected Option<Path> create(String partitionPath, String dataFileName, 
IOType type, boolean checkIfExists) {
+    LOG.info("[direct] Create marker file : " + partitionPath + " " + 
dataFileName);
+    long startTimeMs = System.currentTimeMillis();
     Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
+    Path dirPath = markerPath.getParent();
     try {
-      LOG.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
+      if (!fs.exists(dirPath)) {

Review comment:
       is there a reason to change this? the `exists()` incurs a RPC

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##########
@@ -52,36 +53,50 @@
 
   private int serverPort;
   private Configuration conf;
+  private transient HoodieEngineContext context;
   private transient FileSystem fs;
   private transient Javalin app = null;
   private transient FileSystemViewManager fsViewsManager;
   private final int numThreads;
   private final boolean shouldCompressOutput;
   private final boolean useAsync;
+  private final int markerBatchNumThreads;

Review comment:
       its okay for timeline service to know these. its a core functionality 
provided by the timeline server

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##########
@@ -177,6 +186,7 @@ public RemoteHoodieTableFileSystemView(String server, int 
port, HoodieTableMetaC
         break;
     }
     String content = response.returnContent().asString();
+    LOG.info("Got response in " + (System.currentTimeMillis() - startTimeMs) + 
" ms");

Review comment:
       HoodieTimer?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerFiles.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Operates on marker files for a given write action (commit, delta commit, 
compaction).
+ *
+ * This abstract class provides abstract methods of different marker file 
operations, so that
+ * different marker file mechanism can be implemented.
+ */
+public abstract class MarkerFiles implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
+
+  protected final String basePath;
+  protected final transient Path markerDirPath;
+  protected final String instantTime;
+
+  public MarkerFiles(String basePath, String markerFolderPath, String 
instantTime) {
+    this.basePath = basePath;
+    this.markerDirPath = new Path(markerFolderPath);
+    this.instantTime = instantTime;
+  }
+
+  /**
+   * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.

Review comment:
       +1 . should we leave this abstract? 
   
   

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -222,6 +224,30 @@
           + "files from lake storage, before committing the write. Reduce this 
value, if the high number of tasks incur delays for smaller tables "
           + "or low latency writes.");
 
+  public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+      .key("hoodie.markers.io.mode")

Review comment:
       helps us capture that this is about writing. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -71,6 +72,7 @@
 public class HoodieWriteConfig extends HoodieConfig {
 
   private static final long serialVersionUID = 0L;
+  private static final String VERSION_0_9_0 = "0.9.0";

Review comment:
       i mulled this before. given we are not doing this across the board. do 
you want to keep it consistent and remove this  constant?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectMarkerFiles.java
##########
@@ -175,70 +164,30 @@ public static String stripMarkerSuffix(String path) {
     return markerFiles;
   }
 
-  private String stripMarkerFolderPrefix(String fullMarkerPath) {
-    
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
-    String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
-        new Path(String.format("%s/%s/%s", basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
-    int begin = fullMarkerPath.indexOf(markerRootPath);
-    ValidationUtils.checkArgument(begin >= 0,
-        "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected 
Marker Root=" + markerRootPath);
-    return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
-  }
-
-  /**
-   * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
-   */
-  public Path create(String partitionPath, String dataFileName, IOType type) {
+  @Override
+  protected Option<Path> create(String partitionPath, String dataFileName, 
IOType type, boolean checkIfExists) {
+    LOG.info("[direct] Create marker file : " + partitionPath + " " + 
dataFileName);
+    long startTimeMs = System.currentTimeMillis();
     Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
+    Path dirPath = markerPath.getParent();
     try {
-      LOG.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
+      if (!fs.exists(dirPath)) {
+        fs.mkdirs(dirPath); // create a new partition as needed.
+      }
     } catch (IOException e) {
-      throw new HoodieException("Failed to create marker file " + markerPath, 
e);
+      throw new HoodieIOException("Failed to make dir " + dirPath, e);
     }
-    return markerPath;
-  }
-
-  /**
-   * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
-   *
-   * @return true if the marker file creates successfully,
-   * false if it already exists
-   */
-  public boolean createIfNotExists(String partitionPath, String dataFileName, 
IOType type) {
-    Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
     try {
-      if (fs.exists(markerPath)) {
-        LOG.warn("Marker Path=" + markerPath + " already exists, cancel 
creation");
-        return false;
+      if (checkIfExists && fs.exists(markerPath)) {
+        LOG.warn("Marker path " + markerPath + " already exists, cancel 
creation");

Review comment:
       Hmmm. wondering why this would happen. log appends?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerIOMode.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+/**
+ * Marker IO operation mode.

Review comment:
       `MarkerType`?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerFiles.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+public abstract class MarkerFiles implements Serializable {

Review comment:
       I wonder if we should rename this class to something like `WriteMarkers` 
(now that we don't really marker files per se for each marker) . Then the 
subclasses are `DirectWriteMarkers` and `TimelineServerWriteMarkers` ? wdyt?

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##########
@@ -158,7 +283,9 @@ public int startService() throws IOException {
       app.disableDynamicGzip();
     }
 
-    RequestHandler requestHandler = new RequestHandler(app, conf, 
fsViewsManager, useAsync);
+    RequestHandler requestHandler = new RequestHandler(
+        app, conf, context, fs, fsViewsManager, useAsync, 
markerBatchNumThreads,
+        markerBatchIntervalMs, markerDeleteParallelism);
     app.get("/", ctx -> ctx.result("Hello World"));

Review comment:
       lol. hello world on `/` . funny. `Hello Hudi` at-least as an easter egg?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -222,6 +224,30 @@
           + "files from lake storage, before committing the write. Reduce this 
value, if the high number of tasks incur delays for smaller tables "
           + "or low latency writes.");
 
+  public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+      .key("hoodie.markers.io.mode")
+      .defaultValue(MarkerIOMode.DIRECT.toString())
+      .sinceVersion(VERSION_0_9_0)
+      .withDocumentation("Marker IO mode to use.  Two modes are supported: "
+          + "- DIRECT: individual marker file corresponding to each data file 
is directly "

Review comment:
       In the timeline based, we don't really create marker files. just markers 
that are stored in files. 
   `limited number of underlying files for efficiency`
   
   and lets rename `TIMELINE_BASED` to `TIMELINE_SERVER_BASED`

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
##########
@@ -392,6 +413,44 @@ private void registerFileSlicesAPI() {
     }, true));
   }
 
+  private void registerMarkerAPI() {

Review comment:
       IMO we should directly hit these without going through the filesystem 
view APis

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
##########
@@ -40,51 +41,50 @@
   private static final Logger LOG = 
LogManager.getLogger(EmbeddedTimelineService.class);
 
   private int serverPort;
-  private int preferredPort;
   private String hostAddr;
   private HoodieEngineContext context;
   private final SerializableConfiguration hadoopConf;
-  private final FileSystemViewStorageConfig config;
-  private final HoodieMetadataConfig metadataConfig;
+  private final HoodieWriteConfig writeConfig;
   private final String basePath;
 
-  private final int numThreads;
-  private final boolean shouldCompressOutput;
-  private final boolean useAsync;
   private transient FileSystemViewManager viewManager;
   private transient TimelineService server;
 
-  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
-                                 HoodieMetadataConfig metadataConfig, 
FileSystemViewStorageConfig config, String basePath,
-                                 int numThreads, boolean compressOutput, 
boolean useAsync) {
+  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
     setHostAddr(embeddedTimelineServiceHostAddr);
     this.context = context;
-    this.config = config;
-    this.basePath = basePath;
-    this.metadataConfig = metadataConfig;
+    this.writeConfig = writeConfig;
+    this.basePath = writeConfig.getBasePath();
     this.hadoopConf = context.getHadoopConf();
     this.viewManager = createViewManager();
-    this.preferredPort = embeddedTimelineServerPort;
-    this.numThreads = numThreads;
-    this.shouldCompressOutput = compressOutput;
-    this.useAsync = useAsync;
   }
 
   private FileSystemViewManager createViewManager() {
     // Using passed-in configs to build view storage configs
     FileSystemViewStorageConfig.Builder builder =
-        
FileSystemViewStorageConfig.newBuilder().fromProperties(config.getProps());
+        
FileSystemViewStorageConfig.newBuilder().fromProperties(writeConfig.getClientSpecifiedViewStorageConfig().getProps());
     FileSystemViewStorageType storageType = builder.build().getStorageType();
     if (storageType.equals(FileSystemViewStorageType.REMOTE_ONLY)
         || storageType.equals(FileSystemViewStorageType.REMOTE_FIRST)) {
       // Reset to default if set to Remote
       builder.withStorageType(FileSystemViewStorageType.MEMORY);
     }
-    return FileSystemViewManager.createViewManager(context, metadataConfig, 
builder.build(), basePath);
+    return FileSystemViewManager.createViewManager(context, 
writeConfig.getMetadataConfig(), builder.build(), basePath);
   }
 
   public void startServer() throws IOException {
-    server = new TimelineService(preferredPort, viewManager, 
hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
+    TimelineService.Config timelineServiceConf = 
TimelineService.Config.builder()
+        .serverPort(writeConfig.getEmbeddedTimelineServerPort())
+        .numThreads(writeConfig.getEmbeddedTimelineServerThreads())
+        .compress(writeConfig.getEmbeddedTimelineServerCompressOutput())
+        .async(writeConfig.getEmbeddedTimelineServerUseAsync())
+        
.markerBatchNumThreads(writeConfig.getMarkersTimelineBasedBatchNumThreads())

Review comment:
       lesser number of switches the better. and code should then lazily 
consume these values, when actually using that functionality

##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##########
@@ -118,8 +133,118 @@ public TimelineService(Config config) throws IOException {
     @Parameter(names = {"--compress"}, description = "Compress output using 
gzip")
     public boolean compress = true;
 
+    @Parameter(names = {"--marker-batch-threads", "-mbt"}, description = 
"Number of threads to use for batch processing marker creation requests")
+    public int markerBatchNumThreads = 20;
+
+    @Parameter(names = {"--marker-batch-interval-ms", "-mbi"}, description = 
"The interval in milliseconds between two batch processing of marker creation 
requests")
+    public long markerBatchIntervalMs = 50;
+
+    @Parameter(names = {"--marker-parallelism", "-mdp"}, description = 
"Parallelism to use for reading and deleting marker files")
+    public int markerParallelism = 100;
+
     @Parameter(names = {"--help", "-h"})
     public Boolean help = false;
+
+    public static Builder builder() {
+      return new Builder();
+    }
+
+    /**
+     * Builder of Config class.
+     */
+    public static class Builder {
+      private Integer serverPort = 26754;
+      private FileSystemViewStorageType viewStorageType = 
FileSystemViewStorageType.SPILLABLE_DISK;
+      private Integer maxViewMemPerTableInMB = 2048;
+      private Double memFractionForCompactionPerTable = 0.001;
+      private String baseStorePathForFileGroups = 
FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue();
+      private String rocksDBPath = 
FileSystemViewStorageConfig.ROCKSDB_BASE_PATH_PROP.defaultValue();
+      private int numThreads = DEFAULT_NUM_THREADS;
+      private boolean async = false;
+      private boolean compress = true;
+      private int markerBatchNumThreads = 20;
+      private long markerBatchIntervalMs = 50;
+      private int markerParallelism = 100;
+
+      public Builder() {}
+
+      public Builder serverPort(int serverPort) {
+        this.serverPort = serverPort;
+        return this;
+      }
+
+      public Builder viewStorageType(FileSystemViewStorageType 
viewStorageType) {
+        this.viewStorageType = viewStorageType;
+        return this;
+      }
+
+      public Builder maxViewMemPerTableInMB(int maxViewMemPerTableInMB) {
+        this.maxViewMemPerTableInMB = maxViewMemPerTableInMB;
+        return this;
+      }
+
+      public Builder memFractionForCompactionPerTable(double 
memFractionForCompactionPerTable) {
+        this.memFractionForCompactionPerTable = 
memFractionForCompactionPerTable;
+        return this;
+      }
+
+      public Builder baseStorePathForFileGroups(String 
baseStorePathForFileGroups) {
+        this.baseStorePathForFileGroups = baseStorePathForFileGroups;
+        return this;
+      }
+
+      public Builder rocksDBPath(String rocksDBPath) {
+        this.rocksDBPath = rocksDBPath;
+        return this;
+      }
+
+      public Builder numThreads(int numThreads) {
+        this.numThreads = numThreads;
+        return this;
+      }
+
+      public Builder async(boolean async) {
+        this.async = async;
+        return this;
+      }
+
+      public Builder compress(boolean compress) {
+        this.compress = compress;
+        return this;
+      }
+
+      public Builder markerBatchNumThreads(int markerBatchNumThreads) {
+        this.markerBatchNumThreads = markerBatchNumThreads;
+        return this;
+      }
+
+      public Builder markerBatchIntervalMs(long markerBatchIntervalMs) {
+        this.markerBatchIntervalMs = markerBatchIntervalMs;
+        return this;
+      }
+
+      public Builder markerParallelism(int markerParallelism) {
+        this.markerParallelism = markerParallelism;
+        return this;
+      }
+
+      public Config build() {

Review comment:
       do we really need this new config builder? this is so we can construct 
in the embedded mode?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
##########
@@ -185,4 +186,12 @@
    * Filegroups that are in pending clustering.
    */
   Stream<Pair<HoodieFileGroupId, HoodieInstant>> 
getFileGroupsInPendingClustering();
+
+  Set<String> getAllMarkerFilePaths(String markerDirPath);

Review comment:
       this is not a public facing feature?  Can we avoid adding these methods 
to `TableFileSystemView`  - this interface has very specific purpose and used 
everywhere. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerFilesFactory.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * A factory to generate {@code MarkerFiles} instance based on the {@code 
MarkerIOMode}.
+ */
+public class MarkerFilesFactory {
+  private static final Logger LOG = 
LogManager.getLogger(MarkerFilesFactory.class);
+
+  /**
+   * @param mode the operation mode for marker files
+   * @param table {@code HoodieTable} instance
+   * @param instantTime current instant time
+   * @return  {@code MarkerFiles} instance based on the {@code MarkerIOMode}
+   */
+  public static MarkerFiles get(MarkerIOMode mode, HoodieTable table, String 
instantTime) {
+    LOG.info("Instantiated MarkerFiles with mode: " + mode.toString());
+    switch (mode) {
+      case DIRECT:
+        return new DirectMarkerFiles(table, instantTime);
+      case TIMELINE_BASED:
+        return new TimelineBasedMarkerFiles(table, instantTime);

Review comment:
       Timeline has a different meaning in Hudi. its what you find in .hoodie . 
`TimelineServer...` ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
##########
@@ -40,51 +41,50 @@
   private static final Logger LOG = 
LogManager.getLogger(EmbeddedTimelineService.class);
 
   private int serverPort;
-  private int preferredPort;
   private String hostAddr;
   private HoodieEngineContext context;
   private final SerializableConfiguration hadoopConf;
-  private final FileSystemViewStorageConfig config;
-  private final HoodieMetadataConfig metadataConfig;
+  private final HoodieWriteConfig writeConfig;
   private final String basePath;
 
-  private final int numThreads;
-  private final boolean shouldCompressOutput;
-  private final boolean useAsync;
   private transient FileSystemViewManager viewManager;
   private transient TimelineService server;
 
-  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
-                                 HoodieMetadataConfig metadataConfig, 
FileSystemViewStorageConfig config, String basePath,
-                                 int numThreads, boolean compressOutput, 
boolean useAsync) {
+  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
     setHostAddr(embeddedTimelineServiceHostAddr);
     this.context = context;
-    this.config = config;
-    this.basePath = basePath;
-    this.metadataConfig = metadataConfig;
+    this.writeConfig = writeConfig;
+    this.basePath = writeConfig.getBasePath();
     this.hadoopConf = context.getHadoopConf();
     this.viewManager = createViewManager();
-    this.preferredPort = embeddedTimelineServerPort;
-    this.numThreads = numThreads;
-    this.shouldCompressOutput = compressOutput;
-    this.useAsync = useAsync;
   }
 
   private FileSystemViewManager createViewManager() {
     // Using passed-in configs to build view storage configs
     FileSystemViewStorageConfig.Builder builder =
-        
FileSystemViewStorageConfig.newBuilder().fromProperties(config.getProps());
+        
FileSystemViewStorageConfig.newBuilder().fromProperties(writeConfig.getClientSpecifiedViewStorageConfig().getProps());
     FileSystemViewStorageType storageType = builder.build().getStorageType();
     if (storageType.equals(FileSystemViewStorageType.REMOTE_ONLY)
         || storageType.equals(FileSystemViewStorageType.REMOTE_FIRST)) {
       // Reset to default if set to Remote
       builder.withStorageType(FileSystemViewStorageType.MEMORY);
     }
-    return FileSystemViewManager.createViewManager(context, metadataConfig, 
builder.build(), basePath);
+    return FileSystemViewManager.createViewManager(context, 
writeConfig.getMetadataConfig(), builder.build(), basePath);
   }
 
   public void startServer() throws IOException {
-    server = new TimelineService(preferredPort, viewManager, 
hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
+    TimelineService.Config timelineServiceConf = 
TimelineService.Config.builder()
+        .serverPort(writeConfig.getEmbeddedTimelineServerPort())
+        .numThreads(writeConfig.getEmbeddedTimelineServerThreads())
+        .compress(writeConfig.getEmbeddedTimelineServerCompressOutput())
+        .async(writeConfig.getEmbeddedTimelineServerUseAsync())
+        
.markerBatchNumThreads(writeConfig.getMarkersTimelineBasedBatchNumThreads())

Review comment:
       i think we should do this. if we don't use it, we don't use it. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -423,7 +424,8 @@ protected void preWrite(String instantTime, 
WriteOperationType writeOperationTyp
   protected void postCommit(HoodieTable<T, I, K, O> table, 
HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> 
extraMetadata) {
     try {
       // Delete the marker directory for the instant.
-      new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context, 
config.getMarkersDeleteParallelism());
+      MarkerFilesFactory.get(config.getMarkersIOMode(), table, instantTime)

Review comment:
       do we really have to?  as long as we can automatically "detect" the 
marker versions its fine right? Why migrate to this one (which involved one 
round of listing), just to do rollbacks?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Re-implement marker files via timeline server
> ---------------------------------------------
>
>                 Key: HUDI-1138
>                 URL: https://issues.apache.org/jira/browse/HUDI-1138
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Vinoth Chandar
>            Assignee: Ethan Guo
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Even as you can argue that RFC-15/consolidated metadata, removes the need for 
> deleting partial files written due to spark task failures/stage retries. It 
> will still leave extra files inside the table (and users will pay for it 
> every month) and we need the marker mechanism to be able to delete these 
> partial files. 
> Here we explore if we can improve the current marker file mechanism, that 
> creates one marker file per data file written, by 
> Delegating the createMarker() call to the driver/timeline server, and have it 
> create marker metadata into a single file handle, that is flushed for 
> durability guarantees
>  
> P.S: I was tempted to think Spark listener mechanism can help us deal with 
> failed tasks, but it has no guarantees. the writer job could die without 
> deleting a partial file. i.e it can improve things, but cant provide 
> guarantees 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to