codope commented on code in PR #9776:
URL: https://github.com/apache/hudi/pull/9776#discussion_r1348240543
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -726,19 +727,24 @@ protected void
reconcileAgainstMarkers(HoodieEngineContext context,
return;
}
- // we are not including log appends here, since they are already
fail-safe.
- Set<String> invalidDataPaths = getInvalidDataPaths(markers);
- Set<String> validDataPaths = stats.stream()
+ // Ignores log file appended for update, since they are already
fail-safe.
+ // but new created log files should be included.
+ Set<String> invalidFilePaths = getInvalidDataPaths(markers);
Review Comment:
let's avoid rename? the method name is still `getInvalidDataPaths`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -951,12 +951,6 @@ private void startCommit(String instantTime, String
actionType, HoodieTableMetaC
+ "table could be in an inconsistent state. Pending restores: " +
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream()
.map(instant ->
instant.getTimestamp()).collect(Collectors.toList()).toArray()));
- // if there are pending compactions, their instantTime must not be greater
than that of this instant time
-
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending
->
Review Comment:
Let's take this scenario where I have a pipeline with single writer W1 but
I'm also running an offline compactor (separate job) W2:
At t=10, 10.deltacommit completed by W1
At t=11, 11.deltacommit requested by W1 (as in just generated new instant
time but the commit has not gone inflight).
At t=12, 12.compaction.requested by W2 and goes inflight at 12.5 let's say
At t=13, 11.deltacommit.inflight
This validation used to guard against such scenarios. But, now we're saying
that it's ok to let both the compaction and deltacommit proceed. The compaction
plan will proceed as usual with the latest fileslices at t=10. 11.deltacommit
will create new slice, which will be missed by compactor. However, the
filesystem view API based on new slicing algorithm will ensure that thw new
file slice is considered after the barrier created by the compaction base file.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -164,66 +163,68 @@ public HoodieAppendHandle(HoodieWriteConfig config,
String instantTime, HoodieTa
private void init(HoodieRecord record) {
if (doInit) {
- // extract some information from the first record
- SliceView rtView = hoodieTable.getSliceView();
- Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
- // Set the base commit time as the current instantTime for new inserts
into log files
- String baseInstantTime;
+ String prevCommit = instantTime;
String baseFile = "";
List<String> logFiles = new ArrayList<>();
- if (fileSlice.isPresent()) {
- baseInstantTime = fileSlice.get().getBaseInstantTime();
- baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
- logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
- } else {
- baseInstantTime = instantTime;
- // Handle log file only case. This is necessary for the concurrent
clustering and writer case (e.g., consistent hashing bucket index).
- // NOTE: flink engine use instantTime to mark operation type, check
BaseFlinkCommitActionExecutor::execute
- if (record.getCurrentLocation() != null &&
HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime()))
{
- baseInstantTime = record.getCurrentLocation().getInstantTime();
+ if (config.isCDCEnabled()) {
Review Comment:
we need to have a test covering cdc flow.
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -544,15 +543,27 @@ public static Option<Pair<Integer, String>>
getLatestLogVersion(FileSystem fs, P
}
/**
- * computes the next log version for the specified fileId in the partition
path.
+ * Returns whether the given path exists on the filesystem or false if any
exception occurs.
*/
- public static int computeNextLogVersion(FileSystem fs, Path partitionPath,
final String fileId,
- final String logFileExtension, final String baseCommitTime) throws
IOException {
- Option<Pair<Integer, String>> currentVersionWithWriteToken =
- getLatestLogVersion(fs, partitionPath, fileId, logFileExtension,
baseCommitTime);
- // handle potential overflow
- return (currentVersionWithWriteToken.isPresent()) ?
currentVersionWithWriteToken.get().getKey() + 1
- : HoodieLogFile.LOGFILE_BASE_VERSION;
+ private static boolean fileExists(FileSystem fs, Path path) {
Review Comment:
why do we need it separately? Are we doing more fs.exists call than what we
already have?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -69,6 +84,64 @@ public CompletionTimeQueryView(HoodieTableMetaClient
metaClient, String startIns
load();
}
+ /**
+ * Returns whether the instant is completed.
+ */
+ public boolean isCompleted(String instantTime) {
+ return getCompletionTime(instantTime).isPresent();
+ }
+
+ /**
+ * Returns whether the give instant time {@code instantTime} completed
before the base instant {@code baseInstant}.
+ */
+ public boolean isCompletedBefore(String baseInstant, String instantTime) {
+ Option<String> completionTimeOpt = getCompletionTime(baseInstant,
instantTime);
+ if (completionTimeOpt.isPresent()) {
+ return HoodieTimeline.compareTimestamps(completionTimeOpt.get(),
LESSER_THAN, baseInstant);
+ }
+ return false;
+ }
+
+ /**
+ * Returns whether the give instant time {@code instantTime} is sliced after
or on the base instant {@code baseInstant}.
+ */
+ public boolean isSlicedAfterOrOn(String baseInstant, String instantTime) {
+ Option<String> completionTimeOpt = getCompletionTime(baseInstant,
instantTime);
+ if (completionTimeOpt.isPresent()) {
+ return HoodieTimeline.compareTimestamps(completionTimeOpt.get(),
GREATER_THAN_OR_EQUALS, baseInstant);
+ }
+ return true;
+ }
+
+ /**
+ * Get completion time with a base instant time as a reference to fix the
compatibility.
+ *
+ * @param baseInstant The base instant
+ * @param instantTime The instant time to query the completion time with
+ *
+ * @return Probability fixed completion time.
+ */
+ public Option<String> getCompletionTime(String baseInstant, String
instantTime) {
Review Comment:
Need to cover these public methods in unit tests. I did see a test in
`TestHoodieFileGroup#testGetBaseInstantTime` with mocked
CompletionTimeQueryView. But, can we test the negative overlapping writes
scenarios and ensure we're getting view in correct order?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -138,6 +142,20 @@ protected void refreshTimeline(HoodieTimeline
visibleActiveTimeline) {
this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getWriteTimeline();
}
+ /**
+ * Refresh the completion time query view.
+ */
+ protected void refreshCompletionTimeQueryView() {
+ this.completionTimeQueryView = new CompletionTimeQueryView(metaClient);
Review Comment:
this is going to run full reload including archived (slim) timeline. Does it
need to happen more than once in the lifecyle of a query or batch write? I see
that it can be called by HoodieMetadataFileSystemView as well. We need to be a
bit careful here.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -101,12 +103,36 @@ public void addBaseFile(HoodieBaseFile dataFile) {
/**
* Add a new log file into the group.
+ *
+ * <p>CAUTION: the log file must be added in sequence of the delta commit
time.
Review Comment:
Filesystem view calls this while building filegroups but that call is in
sycnhronized block. However, I am assuming we have a multi writer and
concurrent reader test to validate. Or we can make this visible for testing and
start two threads to add log files and validate.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -198,6 +171,26 @@ public List<HoodieRollbackRequest>
getRollbackRequests(HoodieInstant instantToRo
}
}
+ private FileStatus[] listAllFilesBelongsToOrNewerThanVersion(
Review Comment:
rename to `listAllFilesSinceCommit`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -233,41 +234,45 @@ private static Schema getWriteSchema(HoodieWriteConfig
config) {
return new Schema.Parser().parse(config.getWriteSchema());
}
- protected HoodieLogFormat.Writer createLogWriter(
- Option<FileSlice> fileSlice, String baseCommitTime) throws IOException {
- return createLogWriter(fileSlice, baseCommitTime, null);
+ protected HoodieLogFormat.Writer createLogWriter(String deltaCommitTime) {
+ return createLogWriter(deltaCommitTime, null);
}
- protected HoodieLogFormat.Writer createLogWriter(
- Option<FileSlice> fileSlice, String baseCommitTime, String suffix)
throws IOException {
- Option<HoodieLogFile> latestLogFile = fileSlice.isPresent()
- ? fileSlice.get().getLatestLogFile()
- : Option.empty();
-
- return HoodieLogFormat.newWriterBuilder()
-
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
- .withFileId(fileId)
- .overBaseCommit(baseCommitTime)
-
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
- .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
- .withSizeThreshold(config.getLogFileMaxSize())
- .withFs(fs)
- .withRolloverLogWriteToken(writeToken)
-
.withLogWriteToken(latestLogFile.map(HoodieLogFile::getLogWriteToken).orElse(writeToken))
- .withSuffix(suffix)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
- }
-
- protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime,
String fileSuffix) {
+ protected HoodieLogFormat.Writer createLogWriter(String deltaCommitTime,
String fileSuffix) {
try {
- return createLogWriter(Option.empty(),baseCommitTime, fileSuffix);
+ return HoodieLogFormat.newWriterBuilder()
+
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
+ .withFileId(fileId)
+ .withDeltaCommit(deltaCommitTime)
+ .withFileSize(0L)
Review Comment:
Shouldn't this be `HoodieLogFile#getFileSize`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -164,66 +163,68 @@ public HoodieAppendHandle(HoodieWriteConfig config,
String instantTime, HoodieTa
private void init(HoodieRecord record) {
if (doInit) {
- // extract some information from the first record
- SliceView rtView = hoodieTable.getSliceView();
- Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
- // Set the base commit time as the current instantTime for new inserts
into log files
- String baseInstantTime;
+ String prevCommit = instantTime;
String baseFile = "";
List<String> logFiles = new ArrayList<>();
- if (fileSlice.isPresent()) {
- baseInstantTime = fileSlice.get().getBaseInstantTime();
- baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
- logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
- } else {
- baseInstantTime = instantTime;
- // Handle log file only case. This is necessary for the concurrent
clustering and writer case (e.g., consistent hashing bucket index).
- // NOTE: flink engine use instantTime to mark operation type, check
BaseFlinkCommitActionExecutor::execute
- if (record.getCurrentLocation() != null &&
HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime()))
{
- baseInstantTime = record.getCurrentLocation().getInstantTime();
+ if (config.isCDCEnabled()) {
+ // the cdc reader needs the base file metadata to have deterministic
update sequence.
+ TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
+ Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
+ if (fileSlice.isPresent()) {
+ prevCommit = fileSlice.get().getBaseInstantTime();
+ baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+ logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
}
- // This means there is no base data file, start appending to a new log
file
- fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime,
this.fileId));
- LOG.info("New AppendHandle for partition :" + partitionPath);
}
// Prepare the first write status
- writeStatus.setStat(new HoodieDeltaWriteStat());
+ HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
+ writeStatus.setStat(deltaWriteStat);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
averageRecordSize = sizeEstimator.sizeEstimate(record);
- HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat)
writeStatus.getStat();
- deltaWriteStat.setPrevCommit(baseInstantTime);
+ deltaWriteStat.setPrevCommit(prevCommit);
deltaWriteStat.setPartitionPath(partitionPath);
deltaWriteStat.setFileId(fileId);
deltaWriteStat.setBaseFile(baseFile);
deltaWriteStat.setLogFiles(logFiles);
try {
// Save hoodie partition meta in the partition path
- HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, baseInstantTime,
+ HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()),
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave(getPartitionId());
- // Since the actual log file written to can be different based on when
rollover happens, we use the
- // base file to denote some log appends happened on a slice.
writeToken will still fence concurrent
- // writers.
- // https://issues.apache.org/jira/browse/HUDI-1517
- createMarkerFile(partitionPath,
FSUtils.makeBaseFileName(baseInstantTime, writeToken, fileId,
hoodieTable.getBaseFileExtension()));
-
- this.writer = createLogWriter(fileSlice, baseInstantTime);
+ this.writer = createLogWriter(getFileInstant(record));
} catch (Exception e) {
LOG.error("Error in update task at commit " + instantTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException("Failed to initialize
HoodieAppendHandle for FileId: " + fileId + " on commit "
- + instantTime + " on HDFS path " +
hoodieTable.getMetaClient().getBasePath() + "/" + partitionPath, e);
+ + instantTime + " on HDFS path " +
hoodieTable.getMetaClient().getBasePathV2() + "/" + partitionPath, e);
}
doInit = false;
}
}
+ /**
+ * Returns the instant time to use in the log file name.
+ */
+ private String getFileInstant(HoodieRecord<?> record) {
+ if (config.isConsistentHashingEnabled()) {
+ // Handle log file only case. This is necessary for the concurrent
clustering and writer case (e.g., consistent hashing bucket index).
+ // NOTE: flink engine use instantTime to mark operation type, check
BaseFlinkCommitActionExecutor::execute
Review Comment:
This is possible even when we use certain index types such that inserts can
go directly to log files (e.g. in-memory index). But, in that case, we may not
be using the instant time to mark the operation type. How to handle in such
cases?
I think this logic is every engine-specific. Is it the right place to call?
Can we not resolve the instant time a layer above?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java:
##########
@@ -90,15 +101,25 @@ public void withOutputStream(FSDataOutputStream output) {
*/
private FSDataOutputStream getOutputStream() throws IOException {
if (this.output == null) {
- Path path = logFile.getPath();
- if (fs.exists(path)) {
- rollOver();
- createNewFile();
- LOG.info("File {} already exists, rolling over to {}", path,
logFile.getPath());
- } else {
- LOG.info(logFile + " does not exist. Create a new file");
- // Block size does not matter as we will always manually autoflush
- createNewFile();
+ boolean created = false;
+ while (!created) {
+ try {
+ // Block size does not matter as we will always manually autoflush
+ createNewFile();
+ LOG.info("Created a new log file: {}", logFile);
+ created = true;
+ } catch (FileAlreadyExistsException ignored) {
+ LOG.info("File {} already exists, rolling over", logFile.getPath());
+ rollOver();
+ } catch (RemoteException re) {
+ if
(re.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()))
{
+ LOG.warn("Another task executor writing to the same log file(" +
logFile + ", rolling over");
+ // Rollover the current log file (since cannot get a stream
handle) and create new one
+ rollOver();
+ } else {
+ throw re;
Review Comment:
When will it come to ths else block? Also, please throw a HoodieException
with some message.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -844,6 +847,47 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>>
twoUpsertCommitDataWithTwoP
return Pair.of(records, records2);
}
+ /**
+ * Since how markers are generated for log file changed in Version Six, we
regenerate markers in the way version zero do.
+ *
+ * @param table instance of {@link HoodieTable}
+ */
+ private void prepForUpgradeFromZeroToOne(HoodieTable table) throws
IOException {
Review Comment:
Why do we need it here? Marker for log files PR should have alrady covered
this test right? Is 0 to 1 upgrade flaky?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java:
##########
@@ -118,8 +120,21 @@ public HoodieCompactionPlan generateCompactionPlan()
throws IOException {
.getLatestFileSlices(partitionPath)
.filter(slice -> filterFileSlice(slice, lastCompletedInstantTime,
fgIdsInPendingCompactionAndClustering, instantRange))
.map(s -> {
- List<HoodieLogFile> logFiles =
-
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
+ List<HoodieLogFile> logFiles = s.getLogFiles()
+ // ==============================================================
+ // IMPORTANT
+ // ==============================================================
+ // Currently, our filesystem view could return a file slice with
pending log files there,
+ // these files should be excluded from the plan, let's say we
have such a sequence of actions
+
+ // t10: a delta commit starts,
+ // t20: the compaction is scheduled and the t10 delta commit is
still pending, and the "fg_10.log" is included in the plan
+ // t25: the delta commit 10 finishes,
+ // t30: the compaction execution starts, now the reader
considers the log file "fg_10.log" as valid.
+
+ // for both OCC and NB-CC, this is in-correct.
Review Comment:
Let's add unit test to cover this scenario.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -16,27 +16,33 @@
* limitations under the License.
*/
-package org.apache.hudi.client.timeline;
+package org.apache.hudi.common.table.timeline;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.avro.generic.GenericRecord;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.EQUALS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
/**
* Query view for instant completion time.
*/
-public class CompletionTimeQueryView implements AutoCloseable {
+public class CompletionTimeQueryView implements AutoCloseable, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final long MILLI_SECONDS_IN_THREE_DAYS = 3 * 24 * 3600 * 1000;
Review Comment:
Do we need some internal config to control this?
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -544,15 +543,27 @@ public static Option<Pair<Integer, String>>
getLatestLogVersion(FileSystem fs, P
}
/**
- * computes the next log version for the specified fileId in the partition
path.
+ * Returns whether the given path exists on the filesystem or false if any
exception occurs.
*/
- public static int computeNextLogVersion(FileSystem fs, Path partitionPath,
final String fileId,
- final String logFileExtension, final String baseCommitTime) throws
IOException {
- Option<Pair<Integer, String>> currentVersionWithWriteToken =
- getLatestLogVersion(fs, partitionPath, fileId, logFileExtension,
baseCommitTime);
- // handle potential overflow
- return (currentVersionWithWriteToken.isPresent()) ?
currentVersionWithWriteToken.get().getKey() + 1
- : HoodieLogFile.LOGFILE_BASE_VERSION;
+ private static boolean fileExists(FileSystem fs, Path path) {
+ try {
+ return fs.exists(path);
+ } catch (Exception ignored) {
+ return false;
+ }
+ }
+
+ /**
+ * Computes the next log version for the specified fileId in the partition
path.
+ */
+ public static int computeNextLogVersion(int curVersion, FileSystem fs, Path
partitionPath, final String fileId,
Review Comment:
not being used anywhere?
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java:
##########
Review Comment:
let's move this test class to hudi-common as well? And also, add more tests
for the new methods?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -216,6 +233,9 @@ protected List<HoodieFileGroup>
buildFileGroups(Stream<HoodieBaseFile> baseFileS
group.addNewFileSliceAtInstant(pendingCompaction.get().getKey());
}
}
+ if (logFiles.containsKey(pair)) {
+
logFiles.get(pair).stream().sorted(HoodieLogFile.getLogFileComparator()).forEach(logFile
-> group.addLogFile(completionTimeQueryView, logFile));
+ }
Review Comment:
ok i see, here it's being sorted so log files will be added in sequence of
deltacommit time. As long as there is one caller it's fine. But, how do we that
other callers don't abuse it? Should we then add a validation in
filegroup.addLogFile?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java:
##########
@@ -109,4 +110,11 @@ public static boolean
recordTypeCompatibleEngine(HoodieRecordType recordType, En
public static HoodieRecordMerger mergerToPreCombineMode(HoodieRecordMerger
merger) {
return merger instanceof OperationModeAwareness ?
((OperationModeAwareness) merger).asPreCombiningMode() : merger;
}
+
+ public static String getCurrentLocationInstant(HoodieRecord<?> record) {
+ if (record.getCurrentLocation() != null) {
+ return record.getCurrentLocation().getInstantTime();
+ }
+ return null;
Review Comment:
return an `Option`?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -516,10 +516,13 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
// We want the upsert to go through only after the compaction
// and cleaning schedule completion. So, waiting on latch here.
latchCountDownAndWait(scheduleCountDownLatch, 30000);
- if (tableType == HoodieTableType.MERGE_ON_READ) {
- // Since the compaction already went in, this upsert has
+ if (tableType == HoodieTableType.MERGE_ON_READ && !(resolutionStrategy
instanceof PreferWriterConflictResolutionStrategy)) {
+ // HUDI-6897: Improve
SimpleConcurrentFileWritesConflictResolutionStrategy for NB-CC
Review Comment:
This should be immediate followup. Let's fix the constraints in resolution
strategy.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -126,21 +126,19 @@ public void testMergeOnReadRollbackActionExecutor(boolean
isUsingMarkers) throws
for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry :
rollbackMetadata.entrySet()) {
HoodieRollbackPartitionMetadata meta = entry.getValue();
assertEquals(0, meta.getFailedDeleteFiles().size());
- assertEquals(0, meta.getSuccessDeleteFiles().size());
+ assertEquals(1, meta.getSuccessDeleteFiles().size());
Review Comment:
not following the logic here.. why should there be 1 successful delete at
this stage?
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -544,15 +543,40 @@ public static Option<Pair<Integer, String>>
getLatestLogVersion(FileSystem fs, P
}
/**
- * computes the next log version for the specified fileId in the partition
path.
+ * Get the latest log version for the fileId in the partition path.
+ */
+ public static int getLatestLogVersion(FileSystem fs, Path partitionPath,
final String fileId,
+ final String
logFileExtension, final String deltaCommitTime, final String writeToken) throws
IOException {
+ int version = HoodieLogFile.LOGFILE_BASE_VERSION;
+ String logFileName = makeLogFileName(fileId, logFileExtension,
deltaCommitTime, version, writeToken);
+ while (fileExists(fs, new Path(partitionPath, logFileName))) {
Review Comment:
Yes, we should not have the need to do exists check in
`HoodieLogFormatWriter.getOutputStream`. Go ahead with creating new log file,
it it already exists (which will be known the file system exception thrown upon
creation), just rollover. Let's remove all `fileExists` usages.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]