garyli1019 commented on a change in pull request #3637:
URL: https://github.com/apache/hudi/pull/3637#discussion_r706047942
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
##########
@@ -18,9 +18,16 @@
package org.apache.hudi.sink.compact;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
Review comment:
the import order is wrong here. The checkstyle should fail. please
follow hudi -> 3rd party -> java/scala -> static order
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -52,7 +52,7 @@
import java.util.Objects;
/**
- * The function to build the write profile incrementally for records within a
checkpoint,
+ * The function to build the write
phudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.javarofile
incrementally for records within a checkpoint,
Review comment:
accident?
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
##########
@@ -18,6 +18,10 @@
package org.apache.hudi.util;
+import org.apache.flink.configuration.Configuration;
Review comment:
ditto
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -430,14 +430,14 @@ private FlinkOptions() {
public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_ENABLED =
ConfigOptions
.key("compaction.schedule.enabled")
.booleanType()
- .defaultValue(true) // default true for MOR write
- .withDescription("Schedule the compaction plan, enabled by default for
MOR");
+ .defaultValue(false) // default false for MOR write
Review comment:
@danny0405 WDYT of changing the default value? We could move all the
compaction and cleaning stuff to the batch job as the recommended way.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
##########
@@ -93,7 +93,7 @@
+ "There is a risk of losing data when scheduling compaction outside the
writer job.\n"
Review comment:
There is no risk anymore. We could change the description now. Async
compaction scheduling is actually recommended.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
##########
@@ -29,15 +36,11 @@
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
-
-import com.beust.jcommander.JCommander;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
Review comment:
ditto
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
##########
@@ -50,18 +50,15 @@
* Gets compaction Instant time.
*/
public static String getCompactionInstantTime(HoodieTableMetaClient
metaClient) {
- Option<HoodieInstant> firstPendingInstant = metaClient.getCommitsTimeline()
+ Option<HoodieInstant> firstPendingInstant = metaClient.getActiveTimeline()
.filterPendingExcludingCompaction().firstInstant();
- Option<HoodieInstant> lastCompleteInstant =
metaClient.getActiveTimeline().getWriteTimeline()
- .filterCompletedAndCompactionInstants().lastInstant();
- if (firstPendingInstant.isPresent() && lastCompleteInstant.isPresent()) {
+ if (firstPendingInstant.isPresent()) {
String firstPendingTimestamp = firstPendingInstant.get().getTimestamp();
- String lastCompleteTimestamp = lastCompleteInstant.get().getTimestamp();
- // Committed and pending compaction instants should have strictly lower
timestamps
- return StreamerUtil.medianInstantTime(firstPendingTimestamp,
lastCompleteTimestamp);
+ return StreamerUtil.incrementInstantTime(firstPendingTimestamp, 1);
} else {
return HoodieActiveTimeline.createNewInstantTime();
}
+
Review comment:
ditto
##########
File path:
hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
##########
@@ -35,6 +35,9 @@
protected void setUp(Configuration conf) {
// trigger the compaction for every finished checkpoint
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true);
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+
Review comment:
unnecessary white space
##########
File path: hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
##########
@@ -24,6 +24,7 @@
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
+
Review comment:
unnecessary white space
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]