codope commented on code in PR #9553:
URL: https://github.com/apache/hudi/pull/9553#discussion_r1307430567


##########
hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java:
##########
@@ -25,69 +25,74 @@
  */
 public enum StorageSchemes {
   // Local filesystem
-  FILE("file", false, false, true),
+  FILE("file", false, false, true, false),
   // Hadoop File System
-  HDFS("hdfs", true, false, true),
+  HDFS("hdfs", true, false, true, true),
   // Baidu Advanced File System
-  AFS("afs", true, null, null),
+  AFS("afs", true, null, null, null),
   // Mapr File System
-  MAPRFS("maprfs", true, null, null),
+  MAPRFS("maprfs", true, null, null, null),
   // Apache Ignite FS
-  IGNITE("igfs", true, null, null),
+  IGNITE("igfs", true, null, null, null),
   // AWS S3
-  S3A("s3a", false, true, null), S3("s3", false, true, null),
+  S3A("s3a", false, true, null, false), S3("s3", false, true, null, false),
   // Google Cloud Storage
-  GCS("gs", false, true, null),
+  GCS("gs", false, true, null, null),

Review Comment:
   Why is GCS null, while S3 is false?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -276,4 +283,27 @@ protected static Option<IndexedRecord> 
toAvroRecord(HoodieRecord record, Schema
       return Option.empty();
     }
   }
+
+  protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback 
{
+    // we will create marker file for each log file and include all log files 
to commit message
+    // so that all log files generated in this job will be included in commit 
metadata. This is important for MDT.
+    // Assuming if spark task failed, the log file generated by it will not 
appear in WriteStatus
+
+    @Override
+    public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+      return createAppendMarker(logFileToAppend);
+    }
+
+    @Override
+    public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+      // TODO: HUDI-1517 may distinguish log file created from log file being 
appended in the future @guanziyue
+      return createAppendMarker(logFileToCreate);
+    }
+
+    private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
+      WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+      return writeMarkers.create(partitionPath, logFileToAppend.getFileName(), 
IOType.APPEND,

Review Comment:
   `createIfNotExists`?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +129,162 @@ public boolean commit(String instantTime, 
JavaRDD<WriteStatus> writeStatuses, Op
     return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), 
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, 
extraPreCommitFunc);
   }
 
+  protected void commit(HoodieTable table, String commitActionType, String 
instantTime, HoodieCommitMetadata metadata,
+                        List<HoodieWriteStat> stats, HoodieData<WriteStatus> 
writeStatuses) throws IOException {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    HoodieCommitMetadata fixedCommitMetadata = 
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+    // Finalize write
+    finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+        && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
+      saveInternalSchema(table, instantTime, fixedCommitMetadata);
+    }
+    // update Metadata table
+    writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+    activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
+        
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
+
+  /* In spark mor table, any failed spark task may generate log files which 
are not included in write status.
+   * We need to add these to CommitMetadata so that it will be synced to MDT 
and make MDT has correct file info.
+   */
+  private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table, 
String commitActionType, String instantTime,
+                                                         HoodieCommitMetadata 
commitMetadata) throws IOException {
+    if 
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+        || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) 
{
+      return commitMetadata;
+    }
+
+    HoodieCommitMetadata metadata = commitMetadata;
+    WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
table, instantTime);
+    // if there is log files in this delta commit, we search any invalid log 
files generated by failed spark task
+    boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+        .values().stream().flatMap(List::stream)
+        .anyMatch(writeStat -> FSUtils.isLogFile(new 
Path(config.getBasePath(), writeStat.getPath()).getName()));
+    if (hasLogFileInDeltaCommit) {
+      // get all log files generated by log mark file
+      Set<String> logFilesMarkerPath = new 
HashSet<>(markers.getAppendedLogPaths(context, 
config.getFinalizeWriteParallelism()));

Review Comment:
   this will incur a list call for the marker dir (per deltacommit)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1023,14 +1023,19 @@ public void update(HoodieRollbackMetadata 
rollbackMetadata, String instantTime)
 
       validateRollback(commitToRollbackInstantTime, compactionInstant, 
deltacommitsSinceCompaction);
 
-      // lets apply a delta commit with DT's rb instant(with special suffix) 
containing following records:
+      // lets apply a delta commit with DT's rb instant containing following 
records:
       // a. any log files as part of RB commit metadata that was added
       // b. log files added by the commit in DT being rolled back. By rolled 
back, we mean, a rollback block will be added and does not mean it will be 
deleted.
       // both above list should only be added to FILES partition.
-
-      String rollbackInstantTime = createRollbackTimestamp(instantTime);
+      if (deltacommitsSinceCompaction.containsInstant(instantTime)) {
+        LOG.info("Rolling back MDT deltacommit for " + instantTime + ", since 
previous attempt failed");
+        if (!getWriteClient().rollback(instantTime)) {

Review Comment:
   so a rollback in DT could trigger 2 rollbacks in MDT?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+
+/**
+ * HoodieLogFileWriteCallback is trigger when specific log file operation 
happen
+ */
+public interface HoodieLogFileWriteCallback {
+  default boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+    return true;
+  }
+
+  default boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+    return true;
+  }
+
+  default boolean preLogFileClose(HoodieLogFile logFileToClose) {
+    return true;
+  }
+
+  default boolean postLogFileClose(HoodieLogFile logFileToClose) {

Review Comment:
   this does not close anything?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -79,14 +93,28 @@ public List<HoodieRollbackStat> 
performRollback(HoodieEngineContext context, Hoo
     // stack trace: 
https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
     // related stack overflow post: 
https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as 
GenericData.Array.
     List<SerializableHoodieRollbackRequest> serializableRequests = 
rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
-    return context.reduceByKey(maybeDeleteAndCollectStats(context, 
instantToRollback, serializableRequests, true, parallelism),
-        RollbackUtils::mergeRollbackStat, parallelism);
+    WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
table, instantTime);
+
+    // Considering rollback may failed before, which generated some additional 
log files. We need to add these log files back.
+    Set<String> logPaths = new HashSet<>();
+    try {
+      logPaths = markers.getAppendedLogPaths(context, 
config.getFinalizeWriteParallelism());
+    } catch (FileNotFoundException fnf) {
+      LOG.warn("Rollback never failed and hence no marker dir was found. 
Safely moving on");
+    } catch (IOException e) {
+      throw new HoodieRollbackException("Failed to list log file markers for 
previous attempt of rollback ", e);
+    }
+
+    List<Pair<String, HoodieRollbackStat>> getRollbackStats = 
maybeDeleteAndCollectStats(context, instantTime, instantToRollback, 
serializableRequests, true, parallelism,
+        logPaths);
+    List<HoodieRollbackStat> mergedRollbackStatByPartitionPath = 
context.reduceByKey(getRollbackStats, RollbackUtils::mergeRollbackStat, 
parallelism);

Review Comment:
   will trigger a shuffle



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java:
##########
@@ -25,69 +25,74 @@
  */
 public enum StorageSchemes {
   // Local filesystem
-  FILE("file", false, false, true),
+  FILE("file", false, false, true, false),
   // Hadoop File System
-  HDFS("hdfs", true, false, true),
+  HDFS("hdfs", true, false, true, true),
   // Baidu Advanced File System
-  AFS("afs", true, null, null),
+  AFS("afs", true, null, null, null),
   // Mapr File System
-  MAPRFS("maprfs", true, null, null),
+  MAPRFS("maprfs", true, null, null, null),
   // Apache Ignite FS
-  IGNITE("igfs", true, null, null),
+  IGNITE("igfs", true, null, null, null),
   // AWS S3
-  S3A("s3a", false, true, null), S3("s3", false, true, null),
+  S3A("s3a", false, true, null, false), S3("s3", false, true, null, false),
   // Google Cloud Storage
-  GCS("gs", false, true, null),
+  GCS("gs", false, true, null, null),
   // Azure WASB
-  WASB("wasb", false, null, null), WASBS("wasbs", false, null, null),
+  WASB("wasb", false, null, null, null), WASBS("wasbs", false, null, null, 
null),
   // Azure ADLS
-  ADL("adl", false, null, null),
+  ADL("adl", false, null, null, null),
   // Azure ADLS Gen2
-  ABFS("abfs", false, null, null), ABFSS("abfss", false, null, null),
+  ABFS("abfs", false, null, null, null), ABFSS("abfss", false, null, null, 
null),
   // Aliyun OSS
-  OSS("oss", false, null, null),
+  OSS("oss", false, null, null, null),
   // View FS for federated setups. If federating across cloud stores, then 
append support is false
   // View FS support atomic creation
-  VIEWFS("viewfs", true, null, true),
+  VIEWFS("viewfs", true, null, true, null),
   //ALLUXIO
-  ALLUXIO("alluxio", false, null, null),
+  ALLUXIO("alluxio", false, null, null, null),
   // Tencent Cloud Object Storage
-  COSN("cosn", false, null, null),
+  COSN("cosn", false, null, null, null),
   // Tencent Cloud HDFS
-  CHDFS("ofs", true, null, null),
+  CHDFS("ofs", true, null, null, null),
   // Tencent Cloud CacheFileSystem
-  GOOSEFS("gfs", false, null, null),
+  GOOSEFS("gfs", false, null, null, null),
   // Databricks file system
-  DBFS("dbfs", false, null, null),
+  DBFS("dbfs", false, null, null, null),
   // IBM Cloud Object Storage
-  COS("cos", false, null, null),
+  COS("cos", false, null, null, null),
   // Huawei Cloud Object Storage
-  OBS("obs", false, null, null),
+  OBS("obs", false, null, null, null),
   // Kingsoft Standard Storage ks3
-  KS3("ks3", false, null, null),
+  KS3("ks3", false, null, null, null),
   // JuiceFileSystem
-  JFS("jfs", true, null, null),
+  JFS("jfs", true, null, null, null),
   // Baidu Object Storage
-  BOS("bos", false, null, null),
+  BOS("bos", false, null, null, null),
   // Oracle Cloud Infrastructure Object Storage
-  OCI("oci", false, null, null),
+  OCI("oci", false, null, null, null),
   // Volcengine Object Storage
-  TOS("tos", false, null, null),
+  TOS("tos", false, null, null, null),
   // Volcengine Cloud HDFS
-  CFS("cfs", true, null, null);
+  CFS("cfs", true, null, null, null);
 
   private String scheme;
   private boolean supportsAppend;
   // null for uncertain if write is transactional, please update this for each 
FS
   private Boolean isWriteTransactional;
   // null for uncertain if dfs support atomic create&delete, please update 
this for each FS
   private Boolean supportAtomicCreation;
+  // list files may bring pressure to storage with centralized meta service 
like HDFS.
+  // when we want to get only part of files under a directory rather than all 
files, use getStatus may be more friendly than listStatus.
+  // here is a trade-off between rpc times and throughput of storage meta 
service
+  private Boolean listStatusUnfriendly;

Review Comment:
   do consider cloud costs as well. List calls on cloud object storage is not 
cheap.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -276,4 +283,27 @@ protected static Option<IndexedRecord> 
toAvroRecord(HoodieRecord record, Schema
       return Option.empty();
     }
   }
+
+  protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback 
{

Review Comment:
   This can be in `HoodieAppendHandle`.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +129,162 @@ public boolean commit(String instantTime, 
JavaRDD<WriteStatus> writeStatuses, Op
     return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), 
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, 
extraPreCommitFunc);
   }
 
+  protected void commit(HoodieTable table, String commitActionType, String 
instantTime, HoodieCommitMetadata metadata,
+                        List<HoodieWriteStat> stats, HoodieData<WriteStatus> 
writeStatuses) throws IOException {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    HoodieCommitMetadata fixedCommitMetadata = 
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+    // Finalize write
+    finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+        && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
+      saveInternalSchema(table, instantTime, fixedCommitMetadata);
+    }
+    // update Metadata table
+    writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+    activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
+        
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
+
+  /* In spark mor table, any failed spark task may generate log files which 
are not included in write status.
+   * We need to add these to CommitMetadata so that it will be synced to MDT 
and make MDT has correct file info.
+   */
+  private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table, 
String commitActionType, String instantTime,
+                                                         HoodieCommitMetadata 
commitMetadata) throws IOException {
+    if 
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+        || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) 
{
+      return commitMetadata;
+    }
+
+    HoodieCommitMetadata metadata = commitMetadata;
+    WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
table, instantTime);
+    // if there is log files in this delta commit, we search any invalid log 
files generated by failed spark task
+    boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+        .values().stream().flatMap(List::stream)
+        .anyMatch(writeStat -> FSUtils.isLogFile(new 
Path(config.getBasePath(), writeStat.getPath()).getName()));
+    if (hasLogFileInDeltaCommit) {
+      // get all log files generated by log mark file
+      Set<String> logFilesMarkerPath = new 
HashSet<>(markers.getAppendedLogPaths(context, 
config.getFinalizeWriteParallelism()));
+
+      // remove valid log files
+      for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : 
metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat hoodieWriteStat : 
partitionAndWriteStats.getValue()) {
+          logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+        }
+      }
+
+      // remaining are log files generated by failed spark task, let's 
generate write stat for them
+      if (logFilesMarkerPath.size() > 0) {
+        // populate partition -> map (fileId -> HoodieWriteStat) // we just 
need one write stat per fileID to fetch some info about the file slice of 
interest when we want to add a new WriteStat.
+        List<Pair<String, Map<String, HoodieWriteStat>>> 
partitionToFileIdAndWriteStatList = new ArrayList<>();
+        for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : 
metadata.getPartitionToWriteStats().entrySet()) {
+          String partition = partitionAndWriteStats.getKey();
+          Map<String, HoodieWriteStat> fileIdToWriteStat = new HashMap<>();
+          partitionAndWriteStats.getValue().forEach(writeStat -> {
+            String fileId = writeStat.getFileId();
+            if (!fileIdToWriteStat.containsKey(fileId)) {
+              fileIdToWriteStat.put(fileId, writeStat);
+            }
+          });
+          partitionToFileIdAndWriteStatList.add(Pair.of(partition, 
fileIdToWriteStat));
+        }
+
+        final Path basePath = new Path(config.getBasePath());
+        Map<String, Map<String, List<String>>> 
partitionToFileIdAndMissingLogFiles = new HashMap<>();
+        logFilesMarkerPath
+            .stream()
+            .forEach(logFilePathStr -> {
+              Path logFileFullPath = new Path(config.getBasePath(), 
logFilePathStr);
+              String fileID = FSUtils.getFileId(logFileFullPath.getName());
+              String partitionPath = 
FSUtils.getRelativePartitionPath(basePath, logFileFullPath.getParent());
+              if 
(!partitionToFileIdAndMissingLogFiles.containsKey(partitionPath)) {
+                partitionToFileIdAndMissingLogFiles.put(partitionPath, new 
HashMap<>());
+              }
+              if 
(!partitionToFileIdAndMissingLogFiles.get(partitionPath).containsKey(fileID)) {
+                
partitionToFileIdAndMissingLogFiles.get(partitionPath).put(fileID, new 
ArrayList<>());
+              }
+              
partitionToFileIdAndMissingLogFiles.get(partitionPath).get(fileID).add(logFilePathStr);
+            });
+
+        context.setJobStatus(this.getClass().getSimpleName(), "generate 
writeStat for missing log files");
+
+        // populate partition -> map (fileId -> List <missing log file>)
+        List<Map.Entry<String, Map<String, List<String>>>> missingFilesInfo = 
partitionToFileIdAndMissingLogFiles.entrySet().stream().collect(Collectors.toList());
+        HoodiePairData<String, Map<String, List<String>>> 
partitionToMissingLogFilesHoodieData = 
context.parallelize(missingFilesInfo).mapToPair(
+            (SerializablePairFunction<Map.Entry<String, Map<String, 
List<String>>>, String, Map<String, List<String>>>) t -> Pair.of(t.getKey(), 
t.getValue()));
+
+        // populate partition -> map (fileId -> HoodieWriteStat) // we just 
need one write stat per fileId to fetch some info about the file slice of 
interest.
+        HoodiePairData<String, Map<String, HoodieWriteStat>> 
partitionToWriteStatHoodieData = 
context.parallelize(partitionToFileIdAndWriteStatList).mapToPair(
+            (SerializablePairFunction<Pair<String, Map<String, 
HoodieWriteStat>>, String, Map<String, HoodieWriteStat>>) t -> t);
+
+        SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(hadoopConf);
+
+        // lets do left outer join to add write stats for missing log files
+        List<Pair<String, List<HoodieWriteStat>>> additionalLogFileWriteStat = 
partitionToWriteStatHoodieData
+            .join(partitionToMissingLogFilesHoodieData)
+            .map((SerializableFunction<Pair<String, Pair<Map<String, 
HoodieWriteStat>, Map<String, List<String>>>>, Pair<String, 
List<HoodieWriteStat>>>) v1 -> {
+              final Path basePathLocal = new Path(config.getBasePath());
+              String partitionPath = v1.getKey();
+              Map<String, HoodieWriteStat> fileIdToOriginalWriteStat = 
v1.getValue().getKey();
+              Map<String, List<String>> missingFileIdToLogFilesList = 
v1.getValue().getValue();
+
+              List<HoodieWriteStat> missingWriteStats = new ArrayList();
+              List<String> missingLogFilesForPartition = new ArrayList();
+              missingFileIdToLogFilesList.values().forEach(entry -> 
missingLogFilesForPartition.addAll(entry));
+
+              // fetch file sizes for missing log files
+              Path fullPartitionPath = new Path(config.getBasePath(), 
partitionPath);
+              FileSystem fileSystem = 
fullPartitionPath.getFileSystem(serializableConfiguration.get());
+              List<Option<FileStatus>> fileStatues = 
FSUtils.getFileStatusesUnderPartition(fileSystem, fullPartitionPath, 
missingLogFilesForPartition, true);

Review Comment:
   then again list status per missing file right.. can we not change the 
`WriteMarkers#getAppendedLogPaths` to return a collection of `FileStatus` 
instead of strings.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -79,14 +93,28 @@ public List<HoodieRollbackStat> 
performRollback(HoodieEngineContext context, Hoo
     // stack trace: 
https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
     // related stack overflow post: 
https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as 
GenericData.Array.
     List<SerializableHoodieRollbackRequest> serializableRequests = 
rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
-    return context.reduceByKey(maybeDeleteAndCollectStats(context, 
instantToRollback, serializableRequests, true, parallelism),
-        RollbackUtils::mergeRollbackStat, parallelism);
+    WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
table, instantTime);
+
+    // Considering rollback may failed before, which generated some additional 
log files. We need to add these log files back.
+    Set<String> logPaths = new HashSet<>();
+    try {
+      logPaths = markers.getAppendedLogPaths(context, 
config.getFinalizeWriteParallelism());

Review Comment:
   another list operation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to