leonardBang commented on code in PR #2185:
URL: https://github.com/apache/fluss/pull/2185#discussion_r2672046461
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java:
##########
@@ -38,13 +38,15 @@ public abstract class TieringSplit implements SourceSplit {
protected final TableBucket tableBucket;
@Nullable protected final String partitionName;
+ protected boolean forceIgnore;
// the total number of splits in one round of tiering
protected final int numberOfSplits;
public TieringSplit(
TablePath tablePath,
TableBucket tableBucket,
@Nullable String partitionName,
+ boolean forceIgnore,
int numberOfSplits) {
Review Comment:
```suggestion
int numberOfSplits,
boolean forceIgnore) {
```
Add new parameter to last position is recommended
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java:
##########
@@ -128,11 +139,12 @@ public boolean equals(Object object) {
return Objects.equals(tablePath, that.tablePath)
&& Objects.equals(tableBucket, that.tableBucket)
&& Objects.equals(partitionName, that.partitionName)
+ && forceIgnore == that.forceIgnore
&& numberOfSplits == that.numberOfSplits;
}
@Override
public int hashCode() {
- return Objects.hash(tablePath, tableBucket, partitionName,
numberOfSplits);
+ return Objects.hash(tablePath, tableBucket, partitionName,
forceIgnore, numberOfSplits);
Review Comment:
minor: add to last position.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java:
##########
@@ -126,6 +129,8 @@ public TieringSplit deserialize(int version, byte[]
serialized) throws IOExcepti
}
TableBucket tableBucket = new TableBucket(tableId, partitionId,
bucketId);
+ boolean forceIgnore = in.readBoolean();
Review Comment:
hint: we can add a note we do not need consider compatibility here.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java:
##########
@@ -54,6 +56,7 @@ public TieringSplit(
throw new IllegalArgumentException(
"Partition name and partition id must be both null or both
not null.");
}
+ this.forceIgnore = forceIgnore;
Review Comment:
`forceIgnore` is not clear especially with any explanation, here we want to
use a member to represent `how to read` of TieringSplit, IIUC, the meaning of
this `status` is skip reading data for current split, how about
`skipCurrentRound` and add java doc for method or member?
##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -1811,6 +1811,23 @@ public class ConfigOptions {
+
ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
+ " is false.");
+ public static final ConfigOption<Duration> LAKE_TIERING_TABLE_DURATION_MAX
=
+ key("lake.tiering.table.duration.max")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(30))
+ .withDescription(
+ "The maximum duration for tiering a single table.
If tiering a table exceeds this duration, "
+ + "it will be force completed: the tiering
will be finalized and committed to the data lake "
Review Comment:
after the table wast force completed, would it be tired again for next round
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]