yuzelin commented on code in PR #576:
URL: https://github.com/apache/flink-table-store/pull/576#discussion_r1127319167
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -353,12 +353,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Optional snapshot id used in case of
\"from-snapshot\" scan mode");
- public static final ConfigOption<Duration> LOG_RETENTION =
- key("log.retention")
- .durationType()
+ public static final ConfigOption<Long> SCAN_BOUNDED_WATERMARK =
+ key("scan.bounded.watermark")
+ .longType()
.noDefaultValue()
.withDescription(
- "It means how long changes log will be kept. The
default value is from the log system cluster.");
+ "End condition \"watermark\" for bounded streaming
mode. Stream"
+ + " reading will end until a larger
watermark snapshot is encountered.");
Review Comment:
wrong description: 'will end until'
->
'won't end until' OR 'will end when'
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java:
##########
@@ -18,22 +18,78 @@
package org.apache.flink.table.store.table.source.snapshot;
-import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+
/** Enumerate incremental changes from newly created snapshots. */
public interface SnapshotEnumerator {
/**
- * The first call to this method will produce a {@link
DataTableScan.DataFilePlan} containing
- * the base files for the following incremental changes (or just return
null if there are no
- * base files).
+ * The first call to this method will produce a {@link DataFilePlan}
containing the base files
+ * for the following incremental changes (or just return null if there are
no base files).
*
- * <p>Following calls to this method will produce {@link
DataTableScan.DataFilePlan}s containing
- * incremental changed files. If there is currently no newer snapshots,
null will be returned
- * instead.
+ * <p>Following calls to this method will produce {@link DataFilePlan}s
containing incremental
+ * changed files. If there is currently no newer snapshots, null will be
returned instead.
Review Comment:
Need update comment.
1. produce Result ?
2. return null -> return FinishedResult
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java:
##########
@@ -18,22 +18,78 @@
package org.apache.flink.table.store.table.source.snapshot;
-import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+
/** Enumerate incremental changes from newly created snapshots. */
public interface SnapshotEnumerator {
/**
- * The first call to this method will produce a {@link
DataTableScan.DataFilePlan} containing
- * the base files for the following incremental changes (or just return
null if there are no
- * base files).
+ * The first call to this method will produce a {@link DataFilePlan}
containing the base files
+ * for the following incremental changes (or just return null if there are
no base files).
*
- * <p>Following calls to this method will produce {@link
DataTableScan.DataFilePlan}s containing
- * incremental changed files. If there is currently no newer snapshots,
null will be returned
- * instead.
+ * <p>Following calls to this method will produce {@link DataFilePlan}s
containing incremental
+ * changed files. If there is currently no newer snapshots, null will be
returned instead.
*/
- @Nullable
- DataTableScan.DataFilePlan enumerate();
+ Result enumerate();
+
+ default Result planResult(@Nullable DataFilePlan plan) {
+ return new PlanResult(plan);
+ }
+
+ default Result finishedResult() {
+ return new FinishedResult();
+ }
+
+ default List<DataFilePlan> enumerateAll() {
+ List<DataFilePlan> plans = new ArrayList<>();
+ while (true) {
+ Result result = enumerate();
+ if (result instanceof FinishedResult) {
+ break;
+ }
+ DataFilePlan plan = result.plan();
+ if (plan == null) {
+ continue;
+ }
+ plans.add(plan);
+ }
+ return plans;
+ }
+
+ /** Result of enumeration. */
+ interface Result {
+ @Nullable
+ DataFilePlan plan();
+ }
+
+ /** Result for a plan. */
+ class PlanResult implements Result {
+
+ private final DataFilePlan plan;
+
+ public PlanResult(DataFilePlan plan) {
+ this.plan = plan;
+ }
+
+ @Nullable
+ @Override
+ public DataFilePlan plan() {
+ return plan;
+ }
+ }
+
+ /** Result for finished. */
+ class FinishedResult implements Result {
+
+ @Nullable
+ @Override
+ public DataFilePlan plan() {
+ throw new RuntimeException("This is a finished result.");
+ }
+ }
Review Comment:
I think maybe we can use `public static final FINISHED_RESULT = new
Result{...}` instead.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java:
##########
@@ -63,6 +75,10 @@ public long identifier() {
return identifier;
}
+ public Long watermark() {
+ return watermark;
+ }
Review Comment:
`@Nullable` to this method and the field `watermark`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]