[
https://issues.apache.org/jira/browse/HUDI-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17376801#comment-17376801
]
ASF GitHub Bot commented on HUDI-1138:
--------------------------------------
nsivabalan commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r665610554
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
##########
@@ -84,7 +88,10 @@ private FileSystemViewManager createViewManager() {
}
public void startServer() throws IOException {
- server = new TimelineService(preferredPort, viewManager,
hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
+ server = new TimelineService(preferredPort, viewManager,
+ FSUtils.getFs(basePath, hadoopConf.newCopy()), hadoopConf.newCopy(),
numThreads, shouldCompressOutput, useAsync,
Review comment:
can we send writeConfig itself to TimelineService so that we don't need
to send N no of individual args?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -210,6 +212,21 @@
.defaultValue("1500")
.withDocumentation("");
+ public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+ .key("hoodie.markers.io.mode")
+ .defaultValue(MarkerIOMode.TIMELINE_BASED.toString())
Review comment:
also add min version.
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -54,6 +54,7 @@
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import
org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.marker.MarkerFiles;
Review comment:
revert unintended changes
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
Review comment:
nit: MARKERS_FILENAME_PREFIX
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+ List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+ for (int i = 0; i < batchNumThreads; i++) {
+ isMarkerFileInUseList.add(false);
+ }
+ this.isMarkerFileInUseList =
Collections.synchronizedList(isMarkerFileInUseList);
+ }
+
+ public Set<String> getAllMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+ }
+
+ public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+ .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+ .collect(Collectors.toSet());
+ }
+
+ public CompletableFuture<String> createMarker(Context context, String
markerDirPath, String markerName) {
+ LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+ CreateMarkerCompletableFuture future = new
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+ synchronized (createMarkerRequestlockObject) {
+ createMarkerFutures.add(future);
+ long currTimeMs = System.currentTimeMillis();
+ if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+ nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs -
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+ long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+ executorService.schedule(
+ new BatchCreateMarkerRunnable(), Math.max(0L, waitMs),
TimeUnit.MILLISECONDS);
+ LOG.info("Wait for " + waitMs + " ms, next batch time: " +
nextBatchProcessTimeMs);
+ }
+ }
+ return future;
+ }
+
+ public Boolean deleteMarkers(String markerDir) {
+ Path markerDirPath = new Path(markerDir);
+ try {
+ if (fileSystem.exists(markerDirPath)) {
+ FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+ List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+
+ if (markerDirSubPaths.size() > 0) {
+ for (String subPathStr: markerDirSubPaths) {
+ fileSystem.delete(new Path(subPathStr), true);
+ }
+ }
+
+ boolean result = fileSystem.delete(markerDirPath, true);
+ LOG.info("Removing marker directory at " + markerDirPath);
+ return result;
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ return false;
+ }
+
+ private Set<String> getAllMarkersFromFile(String markerDirPath) {
+ LOG.info("Get all markers from " + markerDirPath);
+ Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+ Set<String> markers = new HashSet<>();
+ try {
+ if (fileSystem.exists(markersFilePath)) {
Review comment:
Did we fix this after fixing for N marker files?
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+ List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+ for (int i = 0; i < batchNumThreads; i++) {
+ isMarkerFileInUseList.add(false);
+ }
+ this.isMarkerFileInUseList =
Collections.synchronizedList(isMarkerFileInUseList);
+ }
+
+ public Set<String> getAllMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+ }
+
+ public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+ .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+ .collect(Collectors.toSet());
+ }
+
+ public CompletableFuture<String> createMarker(Context context, String
markerDirPath, String markerName) {
+ LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+ CreateMarkerCompletableFuture future = new
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+ synchronized (createMarkerRequestlockObject) {
+ createMarkerFutures.add(future);
+ long currTimeMs = System.currentTimeMillis();
+ if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+ nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs -
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+ long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+ executorService.schedule(
+ new BatchCreateMarkerRunnable(), Math.max(0L, waitMs),
TimeUnit.MILLISECONDS);
+ LOG.info("Wait for " + waitMs + " ms, next batch time: " +
nextBatchProcessTimeMs);
+ }
+ }
+ return future;
+ }
+
+ public Boolean deleteMarkers(String markerDir) {
+ Path markerDirPath = new Path(markerDir);
+ try {
+ if (fileSystem.exists(markerDirPath)) {
+ FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+ List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+
+ if (markerDirSubPaths.size() > 0) {
+ for (String subPathStr: markerDirSubPaths) {
+ fileSystem.delete(new Path(subPathStr), true);
+ }
+ }
+
+ boolean result = fileSystem.delete(markerDirPath, true);
+ LOG.info("Removing marker directory at " + markerDirPath);
+ return result;
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ return false;
+ }
+
+ private Set<String> getAllMarkersFromFile(String markerDirPath) {
+ LOG.info("Get all markers from " + markerDirPath);
+ Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+ Set<String> markers = new HashSet<>();
+ try {
+ if (fileSystem.exists(markersFilePath)) {
+ LOG.info("Marker file exists: " + markersFilePath.toString());
+ FSDataInputStream fsDataInputStream = fileSystem.open(markersFilePath);
+ BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+ markers = bufferedReader.lines().collect(Collectors.toSet());
+ bufferedReader.close();
+ fsDataInputStream.close();
+ } else {
+ LOG.info("Marker file not exist: " + markersFilePath.toString());
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read MARKERS file " +
markerDirPath, e);
+ }
+ return markers;
+ }
+
+ private class CreateMarkerCompletableFuture extends
CompletableFuture<String> {
+ private Context context;
+ private final String markerDirPath;
+ private final String markerName;
+ private boolean result;
+ private final long startTimeMs;
+
+ public CreateMarkerCompletableFuture(Context context, String
markerDirPath, String markerName) {
+ super();
+ this.startTimeMs = System.currentTimeMillis();
+ this.context = context;
+ this.markerDirPath = markerDirPath;
+ this.markerName = markerName;
+ this.result = false;
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ public String getMarkerDirPath() {
+ return markerDirPath;
+ }
+
+ public String getMarkerName() {
+ return markerName;
+ }
+
+ public boolean getResult() {
+ return result;
+ }
+
+ public void setResult(boolean result) {
+ LOG.info("*** Request queued for " + (System.currentTimeMillis() -
startTimeMs) + " ms");
+ this.result = result;
+ }
+ }
+
+ private class BatchCreateMarkerRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ LOG.info("Start processing create marker requests");
+ long startTimeMs = System.currentTimeMillis();
+ List<CreateMarkerCompletableFuture> futuresToRemove = new ArrayList<>();
+ Set<String> updatedMarkerDirPaths = new HashSet<>();
+ int markerFileIndex = -1;
+ synchronized (isMarkerFileInUseList) {
+ for (int i = 0; i < isMarkerFileInUseList.size(); i++) {
+ if (!isMarkerFileInUseList.get(i)) {
+ markerFileIndex = i;
+ isMarkerFileInUseList.set(i, true);
+ break;
+ }
+ }
+ if (markerFileIndex < 0) {
+ LOG.info("All marker files are busy, skip batch processing of create
marker requests in " + (System.currentTimeMillis() - startTimeMs) + " ms");
+ return;
+ }
+ }
+
+ LOG.info("timeMs=" + System.currentTimeMillis() + " markerFileIndex=" +
markerFileIndex);
+ synchronized (createMarkerRequestlockObject) {
+ LOG.info("Iterating through existing requests, size=" +
createMarkerFutures.size());
+ for (CreateMarkerCompletableFuture future : createMarkerFutures) {
+ String markerDirPath = future.getMarkerDirPath();
+ String markerName = future.getMarkerName();
+ LOG.info("--- markerDirPath=" + markerDirPath + " markerName=" +
markerName);
+ Set<String> allMarkers =
allMarkersMap.computeIfAbsent(markerDirPath, k -> new HashSet<>());
+ StringBuilder stringBuilder =
fileMarkersMap.computeIfAbsent(markerDirPath, k -> new HashMap<>())
+ .computeIfAbsent(markerFileIndex, k -> new StringBuilder(16384));
+ boolean exists = allMarkers.contains(markerName);
+ if (!exists) {
+ allMarkers.add(markerName);
+ stringBuilder.append(markerName);
+ stringBuilder.append('\n');
+ updatedMarkerDirPaths.add(markerDirPath);
+ }
+ future.setResult(!exists);
+ futuresToRemove.add(future);
+ }
+ createMarkerFutures.removeAll(futuresToRemove);
+ }
+ LOG.info("Flush to MARKERS file .. ");
+ flushMarkersToFile(updatedMarkerDirPaths, markerFileIndex);
+ isMarkerFileInUseList.set(markerFileIndex, false);
+ LOG.info("Resolve request futures .. ");
+ for (CreateMarkerCompletableFuture future : futuresToRemove) {
+ try {
+ future.complete(jsonifyResult(future.getContext(),
future.getResult(), metricsRegistry, OBJECT_MAPPER, LOG));
+ } catch (JsonProcessingException e) {
+ throw new HoodieException("Failed to JSON encode the value", e);
+ }
+ }
+ LOG.info("Finish batch processing of create marker requests in " +
(System.currentTimeMillis() - startTimeMs) + " ms");
+ }
+
+ private void flushMarkersToFile(Set<String> updatedMarkerDirPaths, int
markerFileIndex) {
+ long flushStartTimeMs = System.currentTimeMillis();
+ for (String markerDirPath : updatedMarkerDirPaths) {
+ LOG.info("Write to " + markerDirPath);
+ long startTimeMs = System.currentTimeMillis();
+ Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME +
markerFileIndex);
+ Path dirPath = markersFilePath.getParent();
+ try {
+ if (!fileSystem.exists(dirPath)) {
+ fileSystem.mkdirs(dirPath);
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to make dir " + dirPath, e);
+ }
+ try {
+ LOG.info("Create " + markersFilePath.toString());
+ FSDataOutputStream fsDataOutputStream =
fileSystem.create(markersFilePath);
+
+ BufferedWriter bufferedWriter = new BufferedWriter(new
OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
+
bufferedWriter.write(fileMarkersMap.get(markerDirPath).get(markerFileIndex).toString());
+ bufferedWriter.close();
Review comment:
close these from within finally block
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -210,6 +212,21 @@
.defaultValue("1500")
.withDocumentation("");
+ public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+ .key("hoodie.markers.io.mode")
+ .defaultValue(MarkerIOMode.TIMELINE_BASED.toString())
Review comment:
We are very close to the release. not sure if we make timeline based as
the default. I vote for going w/ "DIRECT" and in next release we can make
timeline based as default.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
##########
@@ -185,4 +186,12 @@
* Filegroups that are in pending clustering.
*/
Stream<Pair<HoodieFileGroupId, HoodieInstant>>
getFileGroupsInPendingClustering();
+
+ Set<String> getAllMarkerFilePaths(String markerDirPath);
Review comment:
Its good to add java docs, especially public interfaces.
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
##########
@@ -49,6 +50,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(FlinkAppendHandle.class);
private boolean isClosed = false;
+ private MarkerFiles markerFiles;
Review comment:
final
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -210,6 +212,21 @@
.defaultValue("1500")
.withDocumentation("");
+ public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+ .key("hoodie.markers.io.mode")
+ .defaultValue(MarkerIOMode.TIMELINE_BASED.toString())
+ .withDocumentation("");
+
+ public static final ConfigProperty<Integer>
MARKERS_TIMELINE_BASED_BATCH_THREAD = ConfigProperty
Review comment:
end with something like num_threads or thread_count.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
##########
@@ -511,4 +511,24 @@ public void close() {
boolean isClosed() {
return closed;
}
+
+ @Override
+ public Set<String> getAllMarkerFilePaths(String markerDirPath) {
Review comment:
I see these are repeated in quite a few classes. Can we move this to one
of the base abstract classes so that we don't need to keep repeating this for
every derived class ?
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java
##########
@@ -47,8 +49,9 @@ public JavaMarkerBasedRollbackStrategy(HoodieTable<T,
List<HoodieRecord<T>>, Lis
@Override
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
try {
- MarkerFiles markerFiles = new MarkerFiles(table,
instantToRollback.getTimestamp());
- List<HoodieRollbackStat> rollbackStats =
context.map(markerFiles.allMarkerFilePaths(), markerFilePath -> {
+ MarkerFiles directMarkerFiles =
Review comment:
can we name the variable just "markerFiles".
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+ List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+ for (int i = 0; i < batchNumThreads; i++) {
+ isMarkerFileInUseList.add(false);
+ }
+ this.isMarkerFileInUseList =
Collections.synchronizedList(isMarkerFileInUseList);
+ }
+
+ public Set<String> getAllMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+ }
+
+ public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+ .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+ .collect(Collectors.toSet());
+ }
+
+ public CompletableFuture<String> createMarker(Context context, String
markerDirPath, String markerName) {
+ LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+ CreateMarkerCompletableFuture future = new
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+ synchronized (createMarkerRequestlockObject) {
+ createMarkerFutures.add(future);
+ long currTimeMs = System.currentTimeMillis();
+ if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+ nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs -
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+ long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+ executorService.schedule(
+ new BatchCreateMarkerRunnable(), Math.max(0L, waitMs),
TimeUnit.MILLISECONDS);
+ LOG.info("Wait for " + waitMs + " ms, next batch time: " +
nextBatchProcessTimeMs);
+ }
+ }
+ return future;
+ }
+
+ public Boolean deleteMarkers(String markerDir) {
+ Path markerDirPath = new Path(markerDir);
+ try {
+ if (fileSystem.exists(markerDirPath)) {
+ FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+ List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+
+ if (markerDirSubPaths.size() > 0) {
+ for (String subPathStr: markerDirSubPaths) {
+ fileSystem.delete(new Path(subPathStr), true);
+ }
+ }
+
+ boolean result = fileSystem.delete(markerDirPath, true);
Review comment:
is there a reason to delete sub directories explicitly. Why not deleting
the root marker directly recursively would suffice? can you please help me
understand.
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
Review comment:
may be: markerFilesUseStatus
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
Review comment:
can we try cached thread pool.
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+ List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+ for (int i = 0; i < batchNumThreads; i++) {
+ isMarkerFileInUseList.add(false);
+ }
+ this.isMarkerFileInUseList =
Collections.synchronizedList(isMarkerFileInUseList);
+ }
+
+ public Set<String> getAllMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+ }
+
+ public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+ .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+ .collect(Collectors.toSet());
+ }
+
+ public CompletableFuture<String> createMarker(Context context, String
markerDirPath, String markerName) {
+ LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+ CreateMarkerCompletableFuture future = new
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+ synchronized (createMarkerRequestlockObject) {
+ createMarkerFutures.add(future);
+ long currTimeMs = System.currentTimeMillis();
+ if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+ nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs -
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+ long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+ executorService.schedule(
+ new BatchCreateMarkerRunnable(), Math.max(0L, waitMs),
TimeUnit.MILLISECONDS);
+ LOG.info("Wait for " + waitMs + " ms, next batch time: " +
nextBatchProcessTimeMs);
+ }
+ }
+ return future;
+ }
+
+ public Boolean deleteMarkers(String markerDir) {
+ Path markerDirPath = new Path(markerDir);
+ try {
+ if (fileSystem.exists(markerDirPath)) {
+ FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+ List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+
+ if (markerDirSubPaths.size() > 0) {
+ for (String subPathStr: markerDirSubPaths) {
+ fileSystem.delete(new Path(subPathStr), true);
+ }
+ }
+
+ boolean result = fileSystem.delete(markerDirPath, true);
+ LOG.info("Removing marker directory at " + markerDirPath);
+ return result;
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ return false;
+ }
+
+ private Set<String> getAllMarkersFromFile(String markerDirPath) {
+ LOG.info("Get all markers from " + markerDirPath);
+ Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+ Set<String> markers = new HashSet<>();
+ try {
+ if (fileSystem.exists(markersFilePath)) {
+ LOG.info("Marker file exists: " + markersFilePath.toString());
+ FSDataInputStream fsDataInputStream = fileSystem.open(markersFilePath);
+ BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+ markers = bufferedReader.lines().collect(Collectors.toSet());
+ bufferedReader.close();
+ fsDataInputStream.close();
+ } else {
+ LOG.info("Marker file not exist: " + markersFilePath.toString());
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read MARKERS file " +
markerDirPath, e);
+ }
+ return markers;
+ }
+
+ private class CreateMarkerCompletableFuture extends
CompletableFuture<String> {
+ private Context context;
Review comment:
final
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
##########
@@ -392,6 +401,38 @@ private void registerFileSlicesAPI() {
}, true));
}
+ private void registerMarkerAPI() {
+ app.get(RemoteHoodieTableFileSystemView.ALL_MARKERS_URL, new
ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_MARKERS", 1);
+ Set<String> markers = markerHandler.getAllMarkers(
+
ctx.queryParam(RemoteHoodieTableFileSystemView.MARKER_DIR_PATH_PARAM, ""));
+ writeValueAsString(ctx, markers);
+ }, false));
+
+ app.get(RemoteHoodieTableFileSystemView.CREATE_AND_MERGE_MARKERS_URL, new
ViewHandler(ctx -> {
+ metricsRegistry.add("CREATE_AND_MERGE_MARKERS", 1);
+ Set<String> markers = markerHandler.getCreateAndMergeMarkers(
+
ctx.queryParam(RemoteHoodieTableFileSystemView.MARKER_DIR_PATH_PARAM, ""));
+ writeValueAsString(ctx, markers);
+ }, false));
+
+ app.post(RemoteHoodieTableFileSystemView.CREATE_MARKER_URL, new
ViewHandler(ctx -> {
+ metricsRegistry.add("CREATE_MARKER", 1);
+ ctx.result(markerHandler.createMarker(
+ ctx,
+
ctx.queryParam(RemoteHoodieTableFileSystemView.MARKER_DIR_PATH_PARAM, ""),
+ ctx.queryParam(RemoteHoodieTableFileSystemView.MARKER_NAME_PARAM,
"")));
+ //writeValueAsString(ctx, success);
Review comment:
why commented out?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##########
@@ -443,6 +452,51 @@ public boolean refresh() {
}
}
+ @Override
+ public Set<String> getAllMarkerFilePaths(String markerDirPath) {
+ Map<String, String> paramsMap = new HashMap<>();
+ paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath);
+ try {
+ return executeRequest(ALL_MARKERS_URL, paramsMap, new
TypeReference<Set<String>>() {}, RequestMethod.GET);
Review comment:
probably we could directly do
Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath).
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+ List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+ for (int i = 0; i < batchNumThreads; i++) {
+ isMarkerFileInUseList.add(false);
+ }
+ this.isMarkerFileInUseList =
Collections.synchronizedList(isMarkerFileInUseList);
+ }
+
+ public Set<String> getAllMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+ }
+
+ public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+ .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+ .collect(Collectors.toSet());
+ }
+
+ public CompletableFuture<String> createMarker(Context context, String
markerDirPath, String markerName) {
+ LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+ CreateMarkerCompletableFuture future = new
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+ synchronized (createMarkerRequestlockObject) {
+ createMarkerFutures.add(future);
+ long currTimeMs = System.currentTimeMillis();
+ if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+ nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs -
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+ long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+ executorService.schedule(
+ new BatchCreateMarkerRunnable(), Math.max(0L, waitMs),
TimeUnit.MILLISECONDS);
+ LOG.info("Wait for " + waitMs + " ms, next batch time: " +
nextBatchProcessTimeMs);
+ }
+ }
+ return future;
+ }
+
+ public Boolean deleteMarkers(String markerDir) {
+ Path markerDirPath = new Path(markerDir);
+ try {
+ if (fileSystem.exists(markerDirPath)) {
+ FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+ List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+
+ if (markerDirSubPaths.size() > 0) {
+ for (String subPathStr: markerDirSubPaths) {
+ fileSystem.delete(new Path(subPathStr), true);
+ }
+ }
+
+ boolean result = fileSystem.delete(markerDirPath, true);
+ LOG.info("Removing marker directory at " + markerDirPath);
+ return result;
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ return false;
+ }
+
+ private Set<String> getAllMarkersFromFile(String markerDirPath) {
+ LOG.info("Get all markers from " + markerDirPath);
+ Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+ Set<String> markers = new HashSet<>();
+ try {
+ if (fileSystem.exists(markersFilePath)) {
+ LOG.info("Marker file exists: " + markersFilePath.toString());
+ FSDataInputStream fsDataInputStream = fileSystem.open(markersFilePath);
+ BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+ markers = bufferedReader.lines().collect(Collectors.toSet());
+ bufferedReader.close();
+ fsDataInputStream.close();
+ } else {
+ LOG.info("Marker file not exist: " + markersFilePath.toString());
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read MARKERS file " +
markerDirPath, e);
+ }
+ return markers;
+ }
+
+ private class CreateMarkerCompletableFuture extends
CompletableFuture<String> {
+ private Context context;
+ private final String markerDirPath;
+ private final String markerName;
+ private boolean result;
+ private final long startTimeMs;
+
+ public CreateMarkerCompletableFuture(Context context, String
markerDirPath, String markerName) {
+ super();
+ this.startTimeMs = System.currentTimeMillis();
+ this.context = context;
+ this.markerDirPath = markerDirPath;
+ this.markerName = markerName;
+ this.result = false;
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ public String getMarkerDirPath() {
+ return markerDirPath;
+ }
+
+ public String getMarkerName() {
+ return markerName;
+ }
+
+ public boolean getResult() {
+ return result;
+ }
+
+ public void setResult(boolean result) {
+ LOG.info("*** Request queued for " + (System.currentTimeMillis() -
startTimeMs) + " ms");
+ this.result = result;
+ }
+ }
+
+ private class BatchCreateMarkerRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ LOG.info("Start processing create marker requests");
+ long startTimeMs = System.currentTimeMillis();
+ List<CreateMarkerCompletableFuture> futuresToRemove = new ArrayList<>();
+ Set<String> updatedMarkerDirPaths = new HashSet<>();
+ int markerFileIndex = -1;
+ synchronized (isMarkerFileInUseList) {
+ for (int i = 0; i < isMarkerFileInUseList.size(); i++) {
+ if (!isMarkerFileInUseList.get(i)) {
+ markerFileIndex = i;
+ isMarkerFileInUseList.set(i, true);
+ break;
+ }
+ }
+ if (markerFileIndex < 0) {
+ LOG.info("All marker files are busy, skip batch processing of create
marker requests in " + (System.currentTimeMillis() - startTimeMs) + " ms");
+ return;
+ }
+ }
+
+ LOG.info("timeMs=" + System.currentTimeMillis() + " markerFileIndex=" +
markerFileIndex);
+ synchronized (createMarkerRequestlockObject) {
+ LOG.info("Iterating through existing requests, size=" +
createMarkerFutures.size());
+ for (CreateMarkerCompletableFuture future : createMarkerFutures) {
+ String markerDirPath = future.getMarkerDirPath();
+ String markerName = future.getMarkerName();
+ LOG.info("--- markerDirPath=" + markerDirPath + " markerName=" +
markerName);
+ Set<String> allMarkers =
allMarkersMap.computeIfAbsent(markerDirPath, k -> new HashSet<>());
+ StringBuilder stringBuilder =
fileMarkersMap.computeIfAbsent(markerDirPath, k -> new HashMap<>())
+ .computeIfAbsent(markerFileIndex, k -> new StringBuilder(16384));
Review comment:
what happens if it grows beyond the allocated size?
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+ List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+ for (int i = 0; i < batchNumThreads; i++) {
+ isMarkerFileInUseList.add(false);
+ }
+ this.isMarkerFileInUseList =
Collections.synchronizedList(isMarkerFileInUseList);
+ }
+
+ public Set<String> getAllMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+ }
+
+ public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+ .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+ .collect(Collectors.toSet());
+ }
+
+ public CompletableFuture<String> createMarker(Context context, String
markerDirPath, String markerName) {
+ LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+ CreateMarkerCompletableFuture future = new
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+ synchronized (createMarkerRequestlockObject) {
+ createMarkerFutures.add(future);
+ long currTimeMs = System.currentTimeMillis();
+ if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+ nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs -
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+ long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+ executorService.schedule(
+ new BatchCreateMarkerRunnable(), Math.max(0L, waitMs),
TimeUnit.MILLISECONDS);
+ LOG.info("Wait for " + waitMs + " ms, next batch time: " +
nextBatchProcessTimeMs);
+ }
+ }
+ return future;
+ }
+
+ public Boolean deleteMarkers(String markerDir) {
+ Path markerDirPath = new Path(markerDir);
+ try {
+ if (fileSystem.exists(markerDirPath)) {
+ FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+ List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+
+ if (markerDirSubPaths.size() > 0) {
+ for (String subPathStr: markerDirSubPaths) {
+ fileSystem.delete(new Path(subPathStr), true);
+ }
+ }
+
+ boolean result = fileSystem.delete(markerDirPath, true);
+ LOG.info("Removing marker directory at " + markerDirPath);
+ return result;
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ return false;
+ }
+
+ private Set<String> getAllMarkersFromFile(String markerDirPath) {
+ LOG.info("Get all markers from " + markerDirPath);
+ Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+ Set<String> markers = new HashSet<>();
+ try {
+ if (fileSystem.exists(markersFilePath)) {
+ LOG.info("Marker file exists: " + markersFilePath.toString());
+ FSDataInputStream fsDataInputStream = fileSystem.open(markersFilePath);
+ BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+ markers = bufferedReader.lines().collect(Collectors.toSet());
+ bufferedReader.close();
+ fsDataInputStream.close();
+ } else {
+ LOG.info("Marker file not exist: " + markersFilePath.toString());
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read MARKERS file " +
markerDirPath, e);
+ }
+ return markers;
+ }
+
+ private class CreateMarkerCompletableFuture extends
CompletableFuture<String> {
+ private Context context;
+ private final String markerDirPath;
+ private final String markerName;
+ private boolean result;
+ private final long startTimeMs;
+
+ public CreateMarkerCompletableFuture(Context context, String
markerDirPath, String markerName) {
+ super();
+ this.startTimeMs = System.currentTimeMillis();
+ this.context = context;
+ this.markerDirPath = markerDirPath;
+ this.markerName = markerName;
+ this.result = false;
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ public String getMarkerDirPath() {
+ return markerDirPath;
+ }
+
+ public String getMarkerName() {
+ return markerName;
+ }
+
+ public boolean getResult() {
+ return result;
+ }
+
+ public void setResult(boolean result) {
+ LOG.info("*** Request queued for " + (System.currentTimeMillis() -
startTimeMs) + " ms");
+ this.result = result;
+ }
+ }
+
+ private class BatchCreateMarkerRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ LOG.info("Start processing create marker requests");
+ long startTimeMs = System.currentTimeMillis();
+ List<CreateMarkerCompletableFuture> futuresToRemove = new ArrayList<>();
+ Set<String> updatedMarkerDirPaths = new HashSet<>();
Review comment:
for a given commit, can we have multiple entries in this set? in other
words, I thought all create requests will have same marker directory path for a
given commit.
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+ List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+ for (int i = 0; i < batchNumThreads; i++) {
+ isMarkerFileInUseList.add(false);
+ }
+ this.isMarkerFileInUseList =
Collections.synchronizedList(isMarkerFileInUseList);
+ }
+
+ public Set<String> getAllMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+ }
+
+ public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+ .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+ .collect(Collectors.toSet());
+ }
+
+ public CompletableFuture<String> createMarker(Context context, String
markerDirPath, String markerName) {
+ LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+ CreateMarkerCompletableFuture future = new
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+ synchronized (createMarkerRequestlockObject) {
+ createMarkerFutures.add(future);
+ long currTimeMs = System.currentTimeMillis();
+ if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+ nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs -
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+ long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+ executorService.schedule(
+ new BatchCreateMarkerRunnable(), Math.max(0L, waitMs),
TimeUnit.MILLISECONDS);
+ LOG.info("Wait for " + waitMs + " ms, next batch time: " +
nextBatchProcessTimeMs);
+ }
+ }
+ return future;
+ }
+
+ public Boolean deleteMarkers(String markerDir) {
+ Path markerDirPath = new Path(markerDir);
+ try {
+ if (fileSystem.exists(markerDirPath)) {
+ FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+ List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+
+ if (markerDirSubPaths.size() > 0) {
+ for (String subPathStr: markerDirSubPaths) {
+ fileSystem.delete(new Path(subPathStr), true);
+ }
+ }
+
+ boolean result = fileSystem.delete(markerDirPath, true);
+ LOG.info("Removing marker directory at " + markerDirPath);
+ return result;
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ return false;
+ }
+
+ private Set<String> getAllMarkersFromFile(String markerDirPath) {
+ LOG.info("Get all markers from " + markerDirPath);
+ Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+ Set<String> markers = new HashSet<>();
+ try {
+ if (fileSystem.exists(markersFilePath)) {
+ LOG.info("Marker file exists: " + markersFilePath.toString());
+ FSDataInputStream fsDataInputStream = fileSystem.open(markersFilePath);
+ BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
+ markers = bufferedReader.lines().collect(Collectors.toSet());
+ bufferedReader.close();
Review comment:
close these from within finally block.
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+ List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+ for (int i = 0; i < batchNumThreads; i++) {
+ isMarkerFileInUseList.add(false);
+ }
+ this.isMarkerFileInUseList =
Collections.synchronizedList(isMarkerFileInUseList);
+ }
+
+ public Set<String> getAllMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+ }
+
+ public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+ .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+ .collect(Collectors.toSet());
+ }
+
+ public CompletableFuture<String> createMarker(Context context, String
markerDirPath, String markerName) {
+ LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+ CreateMarkerCompletableFuture future = new
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+ synchronized (createMarkerRequestlockObject) {
+ createMarkerFutures.add(future);
+ long currTimeMs = System.currentTimeMillis();
+ if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+ nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs -
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+ long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+ executorService.schedule(
+ new BatchCreateMarkerRunnable(), Math.max(0L, waitMs),
TimeUnit.MILLISECONDS);
+ LOG.info("Wait for " + waitMs + " ms, next batch time: " +
nextBatchProcessTimeMs);
+ }
+ }
+ return future;
+ }
+
+ public Boolean deleteMarkers(String markerDir) {
+ Path markerDirPath = new Path(markerDir);
+ try {
+ if (fileSystem.exists(markerDirPath)) {
+ FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+ List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+
+ if (markerDirSubPaths.size() > 0) {
+ for (String subPathStr: markerDirSubPaths) {
+ fileSystem.delete(new Path(subPathStr), true);
+ }
+ }
+
+ boolean result = fileSystem.delete(markerDirPath, true);
+ LOG.info("Removing marker directory at " + markerDirPath);
+ return result;
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ return false;
+ }
+
+ private Set<String> getAllMarkersFromFile(String markerDirPath) {
Review comment:
I assume when this api is invoked, there won't be any create requests
that could get triggered concurrently? if not, there could be a clash
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerFiles.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+public abstract class MarkerFiles implements Serializable {
Review comment:
java docs
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -210,6 +212,21 @@
.defaultValue("1500")
.withDocumentation("");
+ public static final ConfigProperty<String> MARKERS_IO_MODE = ConfigProperty
+ .key("hoodie.markers.io.mode")
+ .defaultValue(MarkerIOMode.TIMELINE_BASED.toString())
Review comment:
also, we need an upgrade step in this regards. something to take up
after this.
##########
File path:
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+ public static final String MARKERS_FILENAME = "MARKERS";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+ private static final long MARGIN_TIME_MS = 10L;
+
+ private final Registry metricsRegistry;
+ private final ScheduledExecutorService executorService;
+ // {markerDirPath -> all markers}
+ private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+ // {markerDirPath -> {markerFileIndex -> markers}}
+ private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new
HashMap<>();
+ private final List<CreateMarkerCompletableFuture> createMarkerFutures = new
ArrayList<>();
+ private final List<Boolean> isMarkerFileInUseList;
+ private final long batchIntervalMs;
+ private final int parallelism;
+ private volatile Object createMarkerRequestlockObject = new Object();
+ private long nextBatchProcessTimeMs = 0L;
+
+ public MarkerHandler(Configuration conf, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry,
+ int batchNumThreads, long batchIntervalMs, int
parallelism) throws IOException {
+ super(conf, fileSystem, viewManager);
+ LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+ LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads +
" batchIntervalMs=" + batchIntervalMs + "ms");
+ this.metricsRegistry = metricsRegistry;
+ this.batchIntervalMs = batchIntervalMs;
+ this.parallelism = parallelism;
+ this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+ List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+ for (int i = 0; i < batchNumThreads; i++) {
+ isMarkerFileInUseList.add(false);
+ }
+ this.isMarkerFileInUseList =
Collections.synchronizedList(isMarkerFileInUseList);
+ }
+
+ public Set<String> getAllMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+ }
+
+ public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+ return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+ .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+ .collect(Collectors.toSet());
+ }
+
+ public CompletableFuture<String> createMarker(Context context, String
markerDirPath, String markerName) {
+ LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+ CreateMarkerCompletableFuture future = new
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+ synchronized (createMarkerRequestlockObject) {
+ createMarkerFutures.add(future);
+ long currTimeMs = System.currentTimeMillis();
+ if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
Review comment:
I will sync up w/ you offline on this code block. wanna see if we can
simplify this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Re-implement marker files via timeline server
> ---------------------------------------------
>
> Key: HUDI-1138
> URL: https://issues.apache.org/jira/browse/HUDI-1138
> Project: Apache Hudi
> Issue Type: Improvement
> Components: Writer Core
> Affects Versions: 0.9.0
> Reporter: Vinoth Chandar
> Assignee: Ethan Guo
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.9.0
>
>
> Even as you can argue that RFC-15/consolidated metadata, removes the need for
> deleting partial files written due to spark task failures/stage retries. It
> will still leave extra files inside the table (and users will pay for it
> every month) and we need the marker mechanism to be able to delete these
> partial files.
> Here we explore if we can improve the current marker file mechanism, that
> creates one marker file per data file written, by
> Delegating the createMarker() call to the driver/timeline server, and have it
> create marker metadata into a single file handle, that is flushed for
> durability guarantees
>
> P.S: I was tempted to think Spark listener mechanism can help us deal with
> failed tasks, but it has no guarantees. the writer job could die without
> deleting a partial file. i.e it can improve things, but cant provide
> guarantees
--
This message was sent by Atlassian Jira
(v8.3.4#803005)