This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e6156e2f [FLINK-30410] Rename 'full' to 'latest-full' and 'compacted'
to 'compacted-full' for scan.mode in Table Store
e6156e2f is described below
commit e6156e2f2998a5c7227f0a40512f03312f041e0a
Author: tsreaper <[email protected]>
AuthorDate: Mon Dec 19 10:38:17 2022 +0800
[FLINK-30410] Rename 'full' to 'latest-full' and 'compacted' to
'compacted-full' for scan.mode in Table Store
This closes #439.
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../store/connector/source/FlinkSourceBuilder.java | 2 +-
.../table/store/connector/BatchFileStoreITCase.java | 3 ++-
.../connector/FullCompactionFileStoreITCase.java | 5 +++--
.../org/apache/flink/table/store/CoreOptions.java | 21 ++++++++++++---------
.../source/snapshot/CompactedStartingScanner.java | 2 +-
.../ContinuousDataFileSnapshotEnumerator.java | 6 +++---
.../table/source/snapshot/FullStartingScanner.java | 4 ++--
.../snapshot/StaticDataFileSnapshotEnumerator.java | 4 ++--
.../apache/flink/table/store/CoreOptionsTest.java | 11 ++++++++++-
.../table/store/kafka/KafkaLogSourceProvider.java | 2 +-
11 files changed, 38 insertions(+), 24 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index ed3316f3..369aea41 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -192,7 +192,7 @@
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
<td><p>Enum</p></td>
- <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp". Otherwise the actual startup mode will be
"full".</li><li>"full": For streaming sources, produces the latest snapshot on
the table upon first startup, and continue to read the latest changes. For
batch sources, just produce the lates [...]
+ <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp". Otherwise the actual startup mode will be
"latest-full".</li><li>"latest-full": For streaming sources, produces the
latest snapshot on the table upon first startup, and continue to read the
latest changes. For batch sources, just pro [...]
</tr>
<tr>
<td><h5>scan.timestamp-millis</h5></td>
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 5f40ae8b..2be92632 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -125,7 +125,7 @@ public class FlinkSourceBuilder {
if (logSourceProvider == null) {
return buildContinuousFileSource();
} else {
- if (startupMode != StartupMode.FULL) {
+ if (startupMode != StartupMode.LATEST_FULL) {
return logSourceProvider.createSource(null);
}
return HybridSource.<RowData,
StaticFileStoreSplitEnumerator>builder(
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index adb21bb9..131ddd99 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -47,6 +47,7 @@ public class BatchFileStoreITCase extends
FileStoreTableITCase {
@Test
public void testCompactedScanModeEmpty() {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
- assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.mode'='compacted') */")).isEmpty();
+ assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.mode'='compacted-full') */"))
+ .isEmpty();
}
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
index 47fb004e..d66ce138 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
@@ -62,7 +62,8 @@ public class FullCompactionFileStoreITCase extends
CatalogITCaseBase {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
- "SELECT * FROM %s /*+
OPTIONS('scan.mode'='compacted') */", table));
+ "SELECT * FROM %s /*+
OPTIONS('scan.mode'='compacted-full') */",
+ table));
sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
assertThat(iterator.collect(2))
@@ -71,7 +72,7 @@ public class FullCompactionFileStoreITCase extends
CatalogITCaseBase {
sql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7",
"8", "9"));
- assertThat(sql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted')
*/"))
+ assertThat(sql("SELECT * FROM T /*+
OPTIONS('scan.mode'='compacted-full') */"))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"), Row.of("4", "5", "6"),
Row.of("7", "8", "9"));
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index aa8d40f5..280a1ab2 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -544,8 +544,10 @@ public class CoreOptions implements Serializable {
if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
return StartupMode.FROM_TIMESTAMP;
} else {
- return StartupMode.FULL;
+ return StartupMode.LATEST_FULL;
}
+ } else if (mode == StartupMode.FULL) {
+ return StartupMode.LATEST_FULL;
} else {
return mode;
}
@@ -604,22 +606,24 @@ public class CoreOptions implements Serializable {
"default",
"Determines actual startup mode according to other table
properties. "
+ "If \"scan.timestamp-millis\" is set the actual
startup mode will be \"from-timestamp\". "
- + "Otherwise the actual startup mode will be
\"full\"."),
+ + "Otherwise the actual startup mode will be
\"latest-full\"."),
- FULL(
- "full",
+ LATEST_FULL(
+ "latest-full",
"For streaming sources, produces the latest snapshot on the
table upon first startup, "
+ "and continue to read the latest changes. "
+ "For batch sources, just produce the latest snapshot
but does not read new changes."),
+ FULL("full", "Deprecated. Same as \"latest-full\"."),
+
LATEST(
"latest",
"For streaming sources, continuously reads latest changes "
+ "without producing a snapshot at the beginning. "
- + "For batch sources, behaves the same as the \"full\"
startup mode."),
+ + "For batch sources, behaves the same as the
\"latest-full\" startup mode."),
- COMPACTED(
- "compacted",
+ COMPACTED_FULL(
+ "compacted-full",
"For streaming sources, produces a snapshot after the latest
compaction on the table "
+ "upon first startup, and continue to read the latest
changes. "
+ "For batch sources, just produce a snapshot after
the latest compaction "
@@ -630,8 +634,7 @@ public class CoreOptions implements Serializable {
"For streaming sources, continuously reads changes "
+ "starting from timestamp specified by
\"scan.timestamp-millis\", "
+ "without producing a snapshot at the beginning. "
- + "For batch sources, produces a snapshot at timestamp
specified by \"scan.timestamp-millis\" "
- + "but does not read new changes.");
+ + "Unsupported for batch sources.");
private final String value;
private final String description;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
index 1eea117c..6e7d1b46 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
/**
* {@link StartingScanner} for the {@link
- * org.apache.flink.table.store.CoreOptions.StartupMode#COMPACTED} startup
mode.
+ * org.apache.flink.table.store.CoreOptions.StartupMode#COMPACTED_FULL}
startup mode.
*/
public class CompactedStartingScanner implements StartingScanner {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index c53a4244..ba69827d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -112,7 +112,7 @@ public class ContinuousDataFileSnapshotEnumerator
implements SnapshotEnumerator
public static ContinuousDataFileSnapshotEnumerator
createWithSnapshotStarting(
DataTable table, DataTableScan scan) {
StartingScanner startingScanner =
- table.options().startupMode() ==
CoreOptions.StartupMode.COMPACTED
+ table.options().startupMode() ==
CoreOptions.StartupMode.COMPACTED_FULL
? new CompactedStartingScanner()
: new FullStartingScanner();
return new ContinuousDataFileSnapshotEnumerator(
@@ -132,11 +132,11 @@ public class ContinuousDataFileSnapshotEnumerator
implements SnapshotEnumerator
private static StartingScanner createStartingScanner(DataTable table) {
CoreOptions.StartupMode startupMode = table.options().startupMode();
Long startupMillis = table.options().logScanTimestampMills();
- if (startupMode == CoreOptions.StartupMode.FULL) {
+ if (startupMode == CoreOptions.StartupMode.LATEST_FULL) {
return new FullStartingScanner();
} else if (startupMode == CoreOptions.StartupMode.LATEST) {
return new ContinuousLatestStartingScanner();
- } else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
+ } else if (startupMode == CoreOptions.StartupMode.COMPACTED_FULL) {
return new CompactedStartingScanner();
} else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
Preconditions.checkNotNull(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
index ccb4a4b4..12ed3979 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
@@ -26,8 +26,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * {@link StartingScanner} for the {@link
org.apache.flink.table.store.CoreOptions.StartupMode#FULL}
- * startup mode.
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#LATEST_FULL} startup
mode.
*/
public class FullStartingScanner implements StartingScanner {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
index 251ad1bf..4c23caf8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
@@ -64,10 +64,10 @@ public class StaticDataFileSnapshotEnumerator implements
SnapshotEnumerator {
public static StaticDataFileSnapshotEnumerator create(DataTable table,
DataTableScan scan) {
CoreOptions.StartupMode startupMode = table.options().startupMode();
StartingScanner startingScanner;
- if (startupMode == CoreOptions.StartupMode.FULL
+ if (startupMode == CoreOptions.StartupMode.LATEST_FULL
|| startupMode == CoreOptions.StartupMode.LATEST) {
startingScanner = new FullStartingScanner();
- } else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
+ } else if (startupMode == CoreOptions.StartupMode.COMPACTED_FULL) {
startingScanner = new CompactedStartingScanner();
} else {
throw new UnsupportedOperationException("Unknown startup mode " +
startupMode.name());
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
index f7c91231..1bc2f11a 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
@@ -31,7 +31,8 @@ public class CoreOptionsTest {
public void testDefaultStartupMode() {
Configuration conf = new Configuration();
assertThat(conf.get(CoreOptions.SCAN_MODE)).isEqualTo(CoreOptions.StartupMode.DEFAULT);
- assertThat(new
CoreOptions(conf).startupMode()).isEqualTo(CoreOptions.StartupMode.FULL);
+ assertThat(new CoreOptions(conf).startupMode())
+ .isEqualTo(CoreOptions.StartupMode.LATEST_FULL);
conf = new Configuration();
conf.set(CoreOptions.SCAN_TIMESTAMP_MILLIS,
System.currentTimeMillis());
@@ -50,4 +51,12 @@ public class CoreOptionsTest {
assertThat(new CoreOptions(conf).startupMode())
.isEqualTo(CoreOptions.StartupMode.FROM_TIMESTAMP);
}
+
+ @Test
+ public void testDeprecatedStartupMode() {
+ Configuration conf = new Configuration();
+ conf.set(CoreOptions.SCAN_MODE, CoreOptions.StartupMode.FULL);
+ assertThat(new CoreOptions(conf).startupMode())
+ .isEqualTo(CoreOptions.StartupMode.LATEST_FULL);
+ }
}
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index ba3ab9e8..a5658d92 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -122,7 +122,7 @@ public class KafkaLogSourceProvider implements
LogSourceProvider {
private OffsetsInitializer toOffsetsInitializer(@Nullable Map<Integer,
Long> bucketOffsets) {
switch (scanMode) {
- case FULL:
+ case LATEST_FULL:
return bucketOffsets == null
? OffsetsInitializer.earliest()
:
OffsetsInitializer.offsets(toKafkaOffsets(bucketOffsets));