boneanxs commented on code in PR #9617:
URL: https://github.com/apache/hudi/pull/9617#discussion_r1351289121
##########
packaging/hudi-kafka-connect-bundle/pom.xml:
##########
@@ -107,6 +107,7 @@
<include>com.lmax:disruptor</include>
<include>com.github.davidmoten:guava-mini</include>
<include>com.github.davidmoten:hilbert-curve</include>
+
<include>com.github.ben-manes.caffeine:caffeine</include>
Review Comment:
I think this is actually a missing dep in hudi-kafka-connect-bundle, all
other bundles include this. If hudi-kafka-connect-bundle doesn't include this,
it could meet ClassNotFound issue before this pr when access
`InternalSchemaCache`(This cache already uses caffeine cache, and is in
hudi-common).
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java:
##########
@@ -137,6 +137,7 @@ protected void completeClustering(
LOG.info("Committing Clustering {} finished with result {}.",
clusteringCommitTime, metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
+ false,
Review Comment:
Here already guarantee the lock, this operation(inflight->complete) is
wrapped by `this.txnManager.beginTransaction` and
`this.txnManager.endTransaction`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -198,8 +204,23 @@ private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassN
in.defaultReadObject();
}
+ /**
+ * Create a complete instant and save to storage with a completion time.
+ * @param shouldLock whether the lock should be enabled.
+ * @param instant the complete instant.
+ */
+ public void createCompleteInstant(boolean shouldLock, HoodieInstant instant)
{
Review Comment:
`InProcessLockProvider` is not accessible in `hudi-common`, so we have to
set shouldLock to false for tests in hudi-common. After moving
`InProcessLockProvider` to `hudi-common`, we don't need to pass this flag,
shouldLock is always true.
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -187,7 +188,11 @@ public static String maskWithoutFileId(String instantTime,
int taskPartitionId)
}
public static String getCommitFromCommitFile(String commitFileName) {
- return commitFileName.split("\\.")[0];
+ try {
+ return HoodieInstant.extractTimestamp(commitFileName);
+ } catch (IllegalArgumentException e) {
+ return "";
Review Comment:
Since callers doesn't pass fileName correctly, they could pass non-commit
file, will change them
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -573,54 +624,85 @@ public HoodieInstant
transitionReplaceRequestedToInflight(HoodieInstant requeste
ValidationUtils.checkArgument(requestedInstant.isRequested());
HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT,
REPLACE_COMMIT_ACTION, requestedInstant.getTimestamp());
// Then write to timeline
- transitionState(requestedInstant, inflightInstant, data);
+ transitionPendingState(requestedInstant, inflightInstant, data);
return inflightInstant;
}
/**
* Transition replace inflight to Committed.
*
+ * @param shouldLock Whether to hold the lock when performing transition
* @param inflightInstant Inflight instant
* @param data Extra Metadata
* @return commit instant
*/
- public HoodieInstant transitionReplaceInflightToComplete(HoodieInstant
inflightInstant, Option<byte[]> data) {
+ public HoodieInstant transitionReplaceInflightToComplete(boolean shouldLock,
+ HoodieInstant
inflightInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED,
REPLACE_COMMIT_ACTION, inflightInstant.getTimestamp());
// Then write to timeline
- transitionState(inflightInstant, commitInstant, data);
+ transitionStateToComplete(shouldLock, inflightInstant, commitInstant,
data);
return commitInstant;
}
- private void transitionState(HoodieInstant fromInstant, HoodieInstant
toInstant, Option<byte[]> data) {
- transitionState(fromInstant, toInstant, data, false);
+ private void transitionPendingState(HoodieInstant fromInstant, HoodieInstant
toInstant, Option<byte[]> data) {
+ transitionPendingState(fromInstant, toInstant, data, false);
+ }
+
+ protected void transitionStateToComplete(boolean shouldLock, HoodieInstant
fromInstant,
+ HoodieInstant toInstant,
Option<byte[]> data) {
+
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()),
String.format("%s and %s are not consistent when transition state.",
fromInstant, toInstant));
+ String fromInstantFileName = fromInstant.getFileName();
+ // Ensures old state exists in timeline
+ LOG.info("Checking for file exists ?" +
getInstantFileNamePath(fromInstantFileName));
+ try {
+ if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
+ // Re-create the .inflight file by opening a new file and write the
commit metadata in
+ createFileInMetaPath(fromInstantFileName, data, false);
+ Path fromInstantPath = getInstantFileNamePath(fromInstantFileName);
+ HoodieInstant instantWithCompletionTime = new
HoodieInstant(toInstant.getState(), toInstant.getAction(),
+ toInstant.getTimestamp(), metaClient.createNewInstantTime(false));
+ Path toInstantPath =
getInstantFileNamePath(instantWithCompletionTime.getFileName());
+ boolean success = metaClient.getFs().rename(fromInstantPath,
toInstantPath);
Review Comment:
This just copied from `transitionPendingState` to handle old timeline format.
```java
protected void transitionPendingState(...) {
...
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
// Re-create the .inflight file by opening a new file and write the
commit metadata in
createFileInMetaPath(fromInstantFileName, data,
allowRedundantTransitions);
Path fromInstantPath = getInstantFileNamePath(fromInstantFileName);
Path toInstantPath = getInstantFileNamePath(toInstantFileName);
boolean success = metaClient.getFs().rename(fromInstantPath,
toInstantPath);
...
}
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -447,37 +490,41 @@ public HoodieInstant
transitionLogCompactionRequestedToInflight(HoodieInstant re
ValidationUtils.checkArgument(requestedInstant.isRequested());
HoodieInstant inflightInstant =
new HoodieInstant(State.INFLIGHT, LOG_COMPACTION_ACTION,
requestedInstant.getTimestamp());
- transitionState(requestedInstant, inflightInstant, Option.empty());
+ transitionPendingState(requestedInstant, inflightInstant, Option.empty());
return inflightInstant;
}
/**
* Transition Compaction State from inflight to Committed.
*
+ * @param shouldLock Whether to hold the lock when performing transition
* @param inflightInstant Inflight instant
* @param data Extra Metadata
* @return commit instant
*/
- public HoodieInstant transitionCompactionInflightToComplete(HoodieInstant
inflightInstant, Option<byte[]> data) {
+ public HoodieInstant transitionCompactionInflightToComplete(boolean
shouldLock, HoodieInstant inflightInstant,
+ Option<byte[]>
data) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED,
COMMIT_ACTION, inflightInstant.getTimestamp());
- transitionState(inflightInstant, commitInstant, data);
+ transitionStateToComplete(shouldLock, inflightInstant, commitInstant,
data);
return commitInstant;
}
/**
* Transition Log Compaction State from inflight to Committed.
*
+ * @param shouldLock Whether to hold the lock when performing transition
* @param inflightInstant Inflight instant
* @param data Extra Metadata
* @return commit instant
*/
- public HoodieInstant transitionLogCompactionInflightToComplete(HoodieInstant
inflightInstant, Option<byte[]> data) {
+ public HoodieInstant transitionLogCompactionInflightToComplete(boolean
shouldLock,
+ HoodieInstant
inflightInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED,
DELTA_COMMIT_ACTION, inflightInstant.getTimestamp());
- transitionState(inflightInstant, commitInstant, data);
+ transitionStateToComplete(shouldLock, inflightInstant, commitInstant,
data);
Review Comment:
yea, we need to add lock and generate completion time only for completed
instants.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import com.github.benmanes.caffeine.cache.Cache;
Review Comment:
It's not a new dep, `InternalSchemaCache` already use it, and it's in
hudi-common
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]