yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r676209554
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -222,6 +224,30 @@
+ "files from lake storage, before committing the write. Reduce this
value, if the high number of tasks incur delays for smaller tables "
+ "or low latency writes.");
+ public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+ .key("hoodie.markers.io.mode")
+ .defaultValue(MarkerIOMode.TIMELINE_BASED.toString())
+ .sinceVersion(VERSION_0_9_0)
Review comment:
I hardcoded the version.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerFiles.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Operates on marker files for a given write action (commit, delta commit,
compaction).
+ *
+ * This abstract class provides abstract methods of different marker file
operations, so that
+ * different marker file mechanism can be implemented.
+ */
+public abstract class MarkerFiles implements Serializable {
+
+ private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
+
+ protected final String basePath;
+ protected final transient Path markerDirPath;
+ protected final String instantTime;
+
+ public MarkerFiles(String basePath, String markerFolderPath, String
instantTime) {
+ this.basePath = basePath;
+ this.markerDirPath = new Path(markerFolderPath);
+ this.instantTime = instantTime;
+ }
+
+ /**
+ * The marker path will be
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
Review comment:
This is the old javadocs. I changed it according to the changes.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineBasedMarkerFiles.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Marker file operations of using timeline service as a proxy to create and
delete marker files.
+ * Each data file has a corresponding marker entry, which is stored in a
limited number of
+ * marker files maintained by the timeline service (each marker file contains
multiple marker
+ * entries).
+ */
+public class TimelineBasedMarkerFiles extends MarkerFiles {
+ private static final Logger LOG =
LogManager.getLogger(TimelineBasedMarkerFiles.class);
+ private final RemoteHoodieTableFileSystemView remoteFSView;
+
+ public TimelineBasedMarkerFiles(String basePath, String markerFolderPath,
String instantTime, RemoteHoodieTableFileSystemView remoteFSView) {
+ super(basePath, markerFolderPath, instantTime);
+ this.remoteFSView = remoteFSView;
+ }
+
+ public TimelineBasedMarkerFiles(HoodieTable table, String instantTime) {
+ this(table.getMetaClient().getBasePath(),
+ table.getMetaClient().getMarkerFolderPath(instantTime),
+ instantTime,
+ FileSystemViewManager.createRemoteFileSystemView(
+ table.getContext().getHadoopConf(),
+ table.getConfig().getViewStorageConfig(), table.getMetaClient())
+ );
+ }
+
+ @Override
+ public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism)
{
+ return remoteFSView.deleteMarkerDir(markerDirPath.toString());
+ }
+
+ @Override
+ public boolean doesMarkerDirExist() {
+ return remoteFSView.doesMarkerDirExist(markerDirPath.toString());
+ }
+
+ @Override
+ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context,
int parallelism) throws IOException {
+ Set<String> markerPaths =
remoteFSView.getCreateAndMergeMarkerFilePaths(markerDirPath.toString());
+ return
markerPaths.stream().map(MarkerFiles::stripMarkerSuffix).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<String> allMarkerFilePaths() {
+ return remoteFSView.getAllMarkerFilePaths(markerDirPath.toString());
+ }
+
+ @Override
+ protected Option<Path> create(String partitionPath, String dataFileName,
IOType type, boolean checkIfExists) {
+ LOG.info("[timeline-based] Create marker file : " + partitionPath + " " +
dataFileName);
+ long startTimeMs = System.currentTimeMillis();
+ String markerFileName = getMarkerFileName(dataFileName, type);
+ boolean success = remoteFSView.createMarker(markerDirPath.toString(),
String.format("%s/%s", partitionPath, markerFileName));
+ LOG.info("[timeline-based] Created marker file in " +
(System.currentTimeMillis() - startTimeMs) + " ms");
Review comment:
Yes, one is for sending out the request and the other is after the
timeline server responds.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectMarkerFiles.java
##########
@@ -175,70 +164,30 @@ public static String stripMarkerSuffix(String path) {
return markerFiles;
}
- private String stripMarkerFolderPrefix(String fullMarkerPath) {
-
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
- String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
- new Path(String.format("%s/%s/%s", basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
- int begin = fullMarkerPath.indexOf(markerRootPath);
- ValidationUtils.checkArgument(begin >= 0,
- "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected
Marker Root=" + markerRootPath);
- return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
- }
-
- /**
- * The marker path will be
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
- */
- public Path create(String partitionPath, String dataFileName, IOType type) {
+ @Override
+ protected Option<Path> create(String partitionPath, String dataFileName,
IOType type, boolean checkIfExists) {
+ LOG.info("[direct] Create marker file : " + partitionPath + " " +
dataFileName);
+ long startTimeMs = System.currentTimeMillis();
Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
+ Path dirPath = markerPath.getParent();
try {
- LOG.info("Creating Marker Path=" + markerPath);
- fs.create(markerPath, false).close();
+ if (!fs.exists(dirPath)) {
+ fs.mkdirs(dirPath); // create a new partition as needed.
+ }
} catch (IOException e) {
- throw new HoodieException("Failed to create marker file " + markerPath,
e);
+ throw new HoodieIOException("Failed to make dir " + dirPath, e);
}
- return markerPath;
- }
-
- /**
- * The marker path will be
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
- *
- * @return true if the marker file creates successfully,
- * false if it already exists
- */
- public boolean createIfNotExists(String partitionPath, String dataFileName,
IOType type) {
- Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
try {
- if (fs.exists(markerPath)) {
- LOG.warn("Marker Path=" + markerPath + " already exists, cancel
creation");
- return false;
+ if (checkIfExists && fs.exists(markerPath)) {
+ LOG.warn("Marker path " + markerPath + " already exists, cancel
creation");
Review comment:
I reverted the changes here.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerIOMode.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+/**
+ * Marker IO operation mode.
Review comment:
Fixed.
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##########
@@ -158,7 +283,9 @@ public int startService() throws IOException {
app.disableDynamicGzip();
}
- RequestHandler requestHandler = new RequestHandler(app, conf,
fsViewsManager, useAsync);
+ RequestHandler requestHandler = new RequestHandler(
+ app, conf, context, fs, fsViewsManager, useAsync,
markerBatchNumThreads,
+ markerBatchIntervalMs, markerDeleteParallelism);
app.get("/", ctx -> ctx.result("Hello World"));
Review comment:
lol. Fixed.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##########
@@ -177,6 +186,7 @@ public RemoteHoodieTableFileSystemView(String server, int
port, HoodieTableMetaC
break;
}
String content = response.returnContent().asString();
+ LOG.info("Got response in " + (System.currentTimeMillis() - startTimeMs) +
" ms");
Review comment:
Removed.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
##########
@@ -67,10 +67,7 @@ private static EmbeddedTimelineService startTimelineService(
LOG.info("Starting Timeline service !!");
Option<String> hostAddr =
context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
EmbeddedTimelineService timelineService = new EmbeddedTimelineService(
- context, hostAddr.orElse(null),config.getEmbeddedTimelineServerPort(),
- config.getMetadataConfig(),
config.getClientSpecifiedViewStorageConfig(), config.getBasePath(),
- config.getEmbeddedTimelineServerThreads(),
config.getEmbeddedTimelineServerCompressOutput(),
- config.getEmbeddedTimelineServerUseAsync());
+ context, hostAddr.orElse(null), config);
Review comment:
no problem!
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
##########
@@ -68,30 +72,37 @@
private final TimelineHandler instantHandler;
private final FileSliceHandler sliceHandler;
private final BaseFileHandler dataFileHandler;
+ private final MarkerHandler markerHandler;
private Registry metricsRegistry = Registry.getRegistry("TimelineService");
private ScheduledExecutorService asyncResultService =
Executors.newSingleThreadScheduledExecutor();
private final boolean useAsync;
- public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager
viewManager, boolean useAsync) throws IOException {
+ public RequestHandler(Javalin app, Configuration conf, HoodieEngineContext
hoodieEngineContext,
Review comment:
I passed in the `TimelineService.Config` instance for now.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -71,6 +72,7 @@
public class HoodieWriteConfig extends HoodieConfig {
private static final long serialVersionUID = 0L;
+ private static final String VERSION_0_9_0 = "0.9.0";
Review comment:
Yes. I removed it.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
##########
@@ -40,51 +41,50 @@
private static final Logger LOG =
LogManager.getLogger(EmbeddedTimelineService.class);
private int serverPort;
- private int preferredPort;
private String hostAddr;
private HoodieEngineContext context;
private final SerializableConfiguration hadoopConf;
- private final FileSystemViewStorageConfig config;
- private final HoodieMetadataConfig metadataConfig;
+ private final HoodieWriteConfig writeConfig;
private final String basePath;
- private final int numThreads;
- private final boolean shouldCompressOutput;
- private final boolean useAsync;
private transient FileSystemViewManager viewManager;
private transient TimelineService server;
- public EmbeddedTimelineService(HoodieEngineContext context, String
embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
- HoodieMetadataConfig metadataConfig,
FileSystemViewStorageConfig config, String basePath,
- int numThreads, boolean compressOutput,
boolean useAsync) {
+ public EmbeddedTimelineService(HoodieEngineContext context, String
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
setHostAddr(embeddedTimelineServiceHostAddr);
this.context = context;
- this.config = config;
- this.basePath = basePath;
- this.metadataConfig = metadataConfig;
+ this.writeConfig = writeConfig;
+ this.basePath = writeConfig.getBasePath();
this.hadoopConf = context.getHadoopConf();
this.viewManager = createViewManager();
- this.preferredPort = embeddedTimelineServerPort;
- this.numThreads = numThreads;
- this.shouldCompressOutput = compressOutput;
- this.useAsync = useAsync;
}
private FileSystemViewManager createViewManager() {
// Using passed-in configs to build view storage configs
FileSystemViewStorageConfig.Builder builder =
-
FileSystemViewStorageConfig.newBuilder().fromProperties(config.getProps());
+
FileSystemViewStorageConfig.newBuilder().fromProperties(writeConfig.getClientSpecifiedViewStorageConfig().getProps());
FileSystemViewStorageType storageType = builder.build().getStorageType();
if (storageType.equals(FileSystemViewStorageType.REMOTE_ONLY)
|| storageType.equals(FileSystemViewStorageType.REMOTE_FIRST)) {
// Reset to default if set to Remote
builder.withStorageType(FileSystemViewStorageType.MEMORY);
}
- return FileSystemViewManager.createViewManager(context, metadataConfig,
builder.build(), basePath);
+ return FileSystemViewManager.createViewManager(context,
writeConfig.getMetadataConfig(), builder.build(), basePath);
}
public void startServer() throws IOException {
- server = new TimelineService(preferredPort, viewManager,
hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
+ TimelineService.Config timelineServiceConf =
TimelineService.Config.builder()
+ .serverPort(writeConfig.getEmbeddedTimelineServerPort())
+ .numThreads(writeConfig.getEmbeddedTimelineServerThreads())
+ .compress(writeConfig.getEmbeddedTimelineServerCompressOutput())
+ .async(writeConfig.getEmbeddedTimelineServerUseAsync())
+
.markerBatchNumThreads(writeConfig.getMarkersTimelineBasedBatchNumThreads())
Review comment:
That makes sense. I changed it to only pass marker-related write
configs to timeline server if timeline-server-based markers are used.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -222,6 +224,30 @@
+ "files from lake storage, before committing the write. Reduce this
value, if the high number of tasks incur delays for smaller tables "
+ "or low latency writes.");
+ public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+ .key("hoodie.markers.io.mode")
Review comment:
Right. Fixed.
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##########
@@ -52,36 +53,50 @@
private int serverPort;
private Configuration conf;
+ private transient HoodieEngineContext context;
private transient FileSystem fs;
private transient Javalin app = null;
private transient FileSystemViewManager fsViewsManager;
private final int numThreads;
private final boolean shouldCompressOutput;
private final boolean useAsync;
+ private final int markerBatchNumThreads;
+ private final long markerBatchIntervalMs;
+ private final int markerDeleteParallelism;
public int getServerPort() {
return serverPort;
}
- public TimelineService(int serverPort, FileSystemViewManager
globalFileSystemViewManager, Configuration conf,
- int numThreads, boolean compressOutput, boolean useAsync) throws
IOException {
+ private TimelineService(HoodieEngineContext context, int serverPort,
FileSystem fs,
Review comment:
Fixed.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -222,6 +224,30 @@
+ "files from lake storage, before committing the write. Reduce this
value, if the high number of tasks incur delays for smaller tables "
+ "or low latency writes.");
+ public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+ .key("hoodie.markers.io.mode")
+ .defaultValue(MarkerIOMode.DIRECT.toString())
+ .sinceVersion(VERSION_0_9_0)
+ .withDocumentation("Marker IO mode to use. Two modes are supported: "
+ + "- DIRECT: individual marker file corresponding to each data file
is directly "
Review comment:
Revised the documentation to be clearer.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerFilesFactory.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * A factory to generate {@code MarkerFiles} instance based on the {@code
MarkerIOMode}.
+ */
+public class MarkerFilesFactory {
+ private static final Logger LOG =
LogManager.getLogger(MarkerFilesFactory.class);
+
+ /**
+ * @param mode the operation mode for marker files
+ * @param table {@code HoodieTable} instance
+ * @param instantTime current instant time
+ * @return {@code MarkerFiles} instance based on the {@code MarkerIOMode}
+ */
+ public static MarkerFiles get(MarkerIOMode mode, HoodieTable table, String
instantTime) {
+ LOG.info("Instantiated MarkerFiles with mode: " + mode.toString());
+ switch (mode) {
+ case DIRECT:
+ return new DirectMarkerFiles(table, instantTime);
+ case TIMELINE_BASED:
+ return new TimelineBasedMarkerFiles(table, instantTime);
Review comment:
Fixed.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
##########
@@ -56,8 +58,8 @@ public SparkMarkerBasedRollbackStrategy(HoodieTable<T,
JavaRDD<HoodieRecord<T>>,
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
try {
- MarkerFiles markerFiles = new MarkerFiles(table,
instantToRollback.getTimestamp());
- List<String> markerFilePaths = markerFiles.allMarkerFilePaths();
+ MarkerFiles markerFiles =
MarkerFilesFactory.get(config.getMarkersIOMode(), table,
instantToRollback.getTimestamp());
+ List<String> markerFilePaths = new
ArrayList<>(markerFiles.allMarkerFilePaths());
Review comment:
The Set instance is converted to List instance here and I don't see a
corresponding API for that.
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##########
@@ -118,8 +133,118 @@ public TimelineService(Config config) throws IOException {
@Parameter(names = {"--compress"}, description = "Compress output using
gzip")
public boolean compress = true;
+ @Parameter(names = {"--marker-batch-threads", "-mbt"}, description =
"Number of threads to use for batch processing marker creation requests")
+ public int markerBatchNumThreads = 20;
+
+ @Parameter(names = {"--marker-batch-interval-ms", "-mbi"}, description =
"The interval in milliseconds between two batch processing of marker creation
requests")
+ public long markerBatchIntervalMs = 50;
+
+ @Parameter(names = {"--marker-parallelism", "-mdp"}, description =
"Parallelism to use for reading and deleting marker files")
+ public int markerParallelism = 100;
+
@Parameter(names = {"--help", "-h"})
public Boolean help = false;
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder of Config class.
+ */
+ public static class Builder {
+ private Integer serverPort = 26754;
+ private FileSystemViewStorageType viewStorageType =
FileSystemViewStorageType.SPILLABLE_DISK;
+ private Integer maxViewMemPerTableInMB = 2048;
+ private Double memFractionForCompactionPerTable = 0.001;
+ private String baseStorePathForFileGroups =
FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue();
+ private String rocksDBPath =
FileSystemViewStorageConfig.ROCKSDB_BASE_PATH_PROP.defaultValue();
+ private int numThreads = DEFAULT_NUM_THREADS;
+ private boolean async = false;
+ private boolean compress = true;
+ private int markerBatchNumThreads = 20;
Review comment:
I only keep one constructor now so we can rely on the Config class to
store the default values.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectMarkerFiles.java
##########
@@ -175,70 +164,30 @@ public static String stripMarkerSuffix(String path) {
return markerFiles;
}
- private String stripMarkerFolderPrefix(String fullMarkerPath) {
-
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
- String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
- new Path(String.format("%s/%s/%s", basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
- int begin = fullMarkerPath.indexOf(markerRootPath);
- ValidationUtils.checkArgument(begin >= 0,
- "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected
Marker Root=" + markerRootPath);
- return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
- }
-
- /**
- * The marker path will be
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
- */
- public Path create(String partitionPath, String dataFileName, IOType type) {
+ @Override
+ protected Option<Path> create(String partitionPath, String dataFileName,
IOType type, boolean checkIfExists) {
+ LOG.info("[direct] Create marker file : " + partitionPath + " " +
dataFileName);
+ long startTimeMs = System.currentTimeMillis();
Review comment:
Fixed.
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##########
@@ -118,8 +133,118 @@ public TimelineService(Config config) throws IOException {
@Parameter(names = {"--compress"}, description = "Compress output using
gzip")
public boolean compress = true;
+ @Parameter(names = {"--marker-batch-threads", "-mbt"}, description =
"Number of threads to use for batch processing marker creation requests")
+ public int markerBatchNumThreads = 20;
+
+ @Parameter(names = {"--marker-batch-interval-ms", "-mbi"}, description =
"The interval in milliseconds between two batch processing of marker creation
requests")
+ public long markerBatchIntervalMs = 50;
+
+ @Parameter(names = {"--marker-parallelism", "-mdp"}, description =
"Parallelism to use for reading and deleting marker files")
+ public int markerParallelism = 100;
+
@Parameter(names = {"--help", "-h"})
public Boolean help = false;
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder of Config class.
+ */
+ public static class Builder {
+ private Integer serverPort = 26754;
+ private FileSystemViewStorageType viewStorageType =
FileSystemViewStorageType.SPILLABLE_DISK;
+ private Integer maxViewMemPerTableInMB = 2048;
+ private Double memFractionForCompactionPerTable = 0.001;
+ private String baseStorePathForFileGroups =
FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue();
+ private String rocksDBPath =
FileSystemViewStorageConfig.ROCKSDB_BASE_PATH_PROP.defaultValue();
+ private int numThreads = DEFAULT_NUM_THREADS;
+ private boolean async = false;
+ private boolean compress = true;
+ private int markerBatchNumThreads = 20;
+ private long markerBatchIntervalMs = 50;
+ private int markerParallelism = 100;
+
+ public Builder() {}
+
+ public Builder serverPort(int serverPort) {
+ this.serverPort = serverPort;
+ return this;
+ }
+
+ public Builder viewStorageType(FileSystemViewStorageType
viewStorageType) {
+ this.viewStorageType = viewStorageType;
+ return this;
+ }
+
+ public Builder maxViewMemPerTableInMB(int maxViewMemPerTableInMB) {
+ this.maxViewMemPerTableInMB = maxViewMemPerTableInMB;
+ return this;
+ }
+
+ public Builder memFractionForCompactionPerTable(double
memFractionForCompactionPerTable) {
+ this.memFractionForCompactionPerTable =
memFractionForCompactionPerTable;
+ return this;
+ }
+
+ public Builder baseStorePathForFileGroups(String
baseStorePathForFileGroups) {
+ this.baseStorePathForFileGroups = baseStorePathForFileGroups;
+ return this;
+ }
+
+ public Builder rocksDBPath(String rocksDBPath) {
+ this.rocksDBPath = rocksDBPath;
+ return this;
+ }
+
+ public Builder numThreads(int numThreads) {
+ this.numThreads = numThreads;
+ return this;
+ }
+
+ public Builder async(boolean async) {
+ this.async = async;
+ return this;
+ }
+
+ public Builder compress(boolean compress) {
+ this.compress = compress;
+ return this;
+ }
+
+ public Builder markerBatchNumThreads(int markerBatchNumThreads) {
+ this.markerBatchNumThreads = markerBatchNumThreads;
+ return this;
+ }
+
+ public Builder markerBatchIntervalMs(long markerBatchIntervalMs) {
+ this.markerBatchIntervalMs = markerBatchIntervalMs;
+ return this;
+ }
+
+ public Builder markerParallelism(int markerParallelism) {
+ this.markerParallelism = markerParallelism;
+ return this;
+ }
+
+ public Config build() {
Review comment:
Yes, to make the style look better. Ideally, we want to avoid public
member variables in the `Config` class. I leave it as is for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]