This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e6156e2f [FLINK-30410] Rename 'full' to 'latest-full' and 'compacted' 
to 'compacted-full' for scan.mode in Table Store
e6156e2f is described below

commit e6156e2f2998a5c7227f0a40512f03312f041e0a
Author: tsreaper <[email protected]>
AuthorDate: Mon Dec 19 10:38:17 2022 +0800

    [FLINK-30410] Rename 'full' to 'latest-full' and 'compacted' to 
'compacted-full' for scan.mode in Table Store
    
    This closes #439.
---
 .../shortcodes/generated/core_configuration.html    |  2 +-
 .../store/connector/source/FlinkSourceBuilder.java  |  2 +-
 .../table/store/connector/BatchFileStoreITCase.java |  3 ++-
 .../connector/FullCompactionFileStoreITCase.java    |  5 +++--
 .../org/apache/flink/table/store/CoreOptions.java   | 21 ++++++++++++---------
 .../source/snapshot/CompactedStartingScanner.java   |  2 +-
 .../ContinuousDataFileSnapshotEnumerator.java       |  6 +++---
 .../table/source/snapshot/FullStartingScanner.java  |  4 ++--
 .../snapshot/StaticDataFileSnapshotEnumerator.java  |  4 ++--
 .../apache/flink/table/store/CoreOptionsTest.java   | 11 ++++++++++-
 .../table/store/kafka/KafkaLogSourceProvider.java   |  2 +-
 11 files changed, 38 insertions(+), 24 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index ed3316f3..369aea41 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -192,7 +192,7 @@
             <td><h5>scan.mode</h5></td>
             <td style="word-wrap: break-word;">default</td>
             <td><p>Enum</p></td>
-            <td>Specify the scanning behavior of the source.<br /><br 
/>Possible values:<ul><li>"default": Determines actual startup mode according 
to other table properties. If "scan.timestamp-millis" is set the actual startup 
mode will be "from-timestamp". Otherwise the actual startup mode will be 
"full".</li><li>"full": For streaming sources, produces the latest snapshot on 
the table upon first startup, and continue to read the latest changes. For 
batch sources, just produce the lates [...]
+            <td>Specify the scanning behavior of the source.<br /><br 
/>Possible values:<ul><li>"default": Determines actual startup mode according 
to other table properties. If "scan.timestamp-millis" is set the actual startup 
mode will be "from-timestamp". Otherwise the actual startup mode will be 
"latest-full".</li><li>"latest-full": For streaming sources, produces the 
latest snapshot on the table upon first startup, and continue to read the 
latest changes. For batch sources, just pro [...]
         </tr>
         <tr>
             <td><h5>scan.timestamp-millis</h5></td>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 5f40ae8b..2be92632 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -125,7 +125,7 @@ public class FlinkSourceBuilder {
             if (logSourceProvider == null) {
                 return buildContinuousFileSource();
             } else {
-                if (startupMode != StartupMode.FULL) {
+                if (startupMode != StartupMode.LATEST_FULL) {
                     return logSourceProvider.createSource(null);
                 }
                 return HybridSource.<RowData, 
StaticFileStoreSplitEnumerator>builder(
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index adb21bb9..131ddd99 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -47,6 +47,7 @@ public class BatchFileStoreITCase extends 
FileStoreTableITCase {
     @Test
     public void testCompactedScanModeEmpty() {
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
-        assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.mode'='compacted') */")).isEmpty();
+        assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.mode'='compacted-full') */"))
+                .isEmpty();
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
index 47fb004e..d66ce138 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
@@ -62,7 +62,8 @@ public class FullCompactionFileStoreITCase extends 
CatalogITCaseBase {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
                         streamSqlIter(
-                                "SELECT * FROM %s /*+ 
OPTIONS('scan.mode'='compacted') */", table));
+                                "SELECT * FROM %s /*+ 
OPTIONS('scan.mode'='compacted-full') */",
+                                table));
 
         sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
         assertThat(iterator.collect(2))
@@ -71,7 +72,7 @@ public class FullCompactionFileStoreITCase extends 
CatalogITCaseBase {
         sql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", 
"8", "9"));
 
-        assertThat(sql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted') 
*/"))
+        assertThat(sql("SELECT * FROM T /*+ 
OPTIONS('scan.mode'='compacted-full') */"))
                 .containsExactlyInAnyOrder(
                         Row.of("1", "2", "3"), Row.of("4", "5", "6"), 
Row.of("7", "8", "9"));
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index aa8d40f5..280a1ab2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -544,8 +544,10 @@ public class CoreOptions implements Serializable {
             if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
                 return StartupMode.FROM_TIMESTAMP;
             } else {
-                return StartupMode.FULL;
+                return StartupMode.LATEST_FULL;
             }
+        } else if (mode == StartupMode.FULL) {
+            return StartupMode.LATEST_FULL;
         } else {
             return mode;
         }
@@ -604,22 +606,24 @@ public class CoreOptions implements Serializable {
                 "default",
                 "Determines actual startup mode according to other table 
properties. "
                         + "If \"scan.timestamp-millis\" is set the actual 
startup mode will be \"from-timestamp\". "
-                        + "Otherwise the actual startup mode will be 
\"full\"."),
+                        + "Otherwise the actual startup mode will be 
\"latest-full\"."),
 
-        FULL(
-                "full",
+        LATEST_FULL(
+                "latest-full",
                 "For streaming sources, produces the latest snapshot on the 
table upon first startup, "
                         + "and continue to read the latest changes. "
                         + "For batch sources, just produce the latest snapshot 
but does not read new changes."),
 
+        FULL("full", "Deprecated. Same as \"latest-full\"."),
+
         LATEST(
                 "latest",
                 "For streaming sources, continuously reads latest changes "
                         + "without producing a snapshot at the beginning. "
-                        + "For batch sources, behaves the same as the \"full\" 
startup mode."),
+                        + "For batch sources, behaves the same as the 
\"latest-full\" startup mode."),
 
-        COMPACTED(
-                "compacted",
+        COMPACTED_FULL(
+                "compacted-full",
                 "For streaming sources, produces a snapshot after the latest 
compaction on the table "
                         + "upon first startup, and continue to read the latest 
changes. "
                         + "For batch sources, just produce a snapshot after 
the latest compaction "
@@ -630,8 +634,7 @@ public class CoreOptions implements Serializable {
                 "For streaming sources, continuously reads changes "
                         + "starting from timestamp specified by 
\"scan.timestamp-millis\", "
                         + "without producing a snapshot at the beginning. "
-                        + "For batch sources, produces a snapshot at timestamp 
specified by \"scan.timestamp-millis\" "
-                        + "but does not read new changes.");
+                        + "Unsupported for batch sources.");
 
         private final String value;
         private final String description;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
index 1eea117c..6e7d1b46 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * {@link StartingScanner} for the {@link
- * org.apache.flink.table.store.CoreOptions.StartupMode#COMPACTED} startup 
mode.
+ * org.apache.flink.table.store.CoreOptions.StartupMode#COMPACTED_FULL} 
startup mode.
  */
 public class CompactedStartingScanner implements StartingScanner {
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index c53a4244..ba69827d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -112,7 +112,7 @@ public class ContinuousDataFileSnapshotEnumerator 
implements SnapshotEnumerator
     public static ContinuousDataFileSnapshotEnumerator 
createWithSnapshotStarting(
             DataTable table, DataTableScan scan) {
         StartingScanner startingScanner =
-                table.options().startupMode() == 
CoreOptions.StartupMode.COMPACTED
+                table.options().startupMode() == 
CoreOptions.StartupMode.COMPACTED_FULL
                         ? new CompactedStartingScanner()
                         : new FullStartingScanner();
         return new ContinuousDataFileSnapshotEnumerator(
@@ -132,11 +132,11 @@ public class ContinuousDataFileSnapshotEnumerator 
implements SnapshotEnumerator
     private static StartingScanner createStartingScanner(DataTable table) {
         CoreOptions.StartupMode startupMode = table.options().startupMode();
         Long startupMillis = table.options().logScanTimestampMills();
-        if (startupMode == CoreOptions.StartupMode.FULL) {
+        if (startupMode == CoreOptions.StartupMode.LATEST_FULL) {
             return new FullStartingScanner();
         } else if (startupMode == CoreOptions.StartupMode.LATEST) {
             return new ContinuousLatestStartingScanner();
-        } else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
+        } else if (startupMode == CoreOptions.StartupMode.COMPACTED_FULL) {
             return new CompactedStartingScanner();
         } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
             Preconditions.checkNotNull(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
index ccb4a4b4..12ed3979 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
@@ -26,8 +26,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link StartingScanner} for the {@link 
org.apache.flink.table.store.CoreOptions.StartupMode#FULL}
- * startup mode.
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#LATEST_FULL} startup 
mode.
  */
 public class FullStartingScanner implements StartingScanner {
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
index 251ad1bf..4c23caf8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
@@ -64,10 +64,10 @@ public class StaticDataFileSnapshotEnumerator implements 
SnapshotEnumerator {
     public static StaticDataFileSnapshotEnumerator create(DataTable table, 
DataTableScan scan) {
         CoreOptions.StartupMode startupMode = table.options().startupMode();
         StartingScanner startingScanner;
-        if (startupMode == CoreOptions.StartupMode.FULL
+        if (startupMode == CoreOptions.StartupMode.LATEST_FULL
                 || startupMode == CoreOptions.StartupMode.LATEST) {
             startingScanner = new FullStartingScanner();
-        } else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
+        } else if (startupMode == CoreOptions.StartupMode.COMPACTED_FULL) {
             startingScanner = new CompactedStartingScanner();
         } else {
             throw new UnsupportedOperationException("Unknown startup mode " + 
startupMode.name());
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
index f7c91231..1bc2f11a 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
@@ -31,7 +31,8 @@ public class CoreOptionsTest {
     public void testDefaultStartupMode() {
         Configuration conf = new Configuration();
         
assertThat(conf.get(CoreOptions.SCAN_MODE)).isEqualTo(CoreOptions.StartupMode.DEFAULT);
-        assertThat(new 
CoreOptions(conf).startupMode()).isEqualTo(CoreOptions.StartupMode.FULL);
+        assertThat(new CoreOptions(conf).startupMode())
+                .isEqualTo(CoreOptions.StartupMode.LATEST_FULL);
 
         conf = new Configuration();
         conf.set(CoreOptions.SCAN_TIMESTAMP_MILLIS, 
System.currentTimeMillis());
@@ -50,4 +51,12 @@ public class CoreOptionsTest {
         assertThat(new CoreOptions(conf).startupMode())
                 .isEqualTo(CoreOptions.StartupMode.FROM_TIMESTAMP);
     }
+
+    @Test
+    public void testDeprecatedStartupMode() {
+        Configuration conf = new Configuration();
+        conf.set(CoreOptions.SCAN_MODE, CoreOptions.StartupMode.FULL);
+        assertThat(new CoreOptions(conf).startupMode())
+                .isEqualTo(CoreOptions.StartupMode.LATEST_FULL);
+    }
 }
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index ba3ab9e8..a5658d92 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -122,7 +122,7 @@ public class KafkaLogSourceProvider implements 
LogSourceProvider {
 
     private OffsetsInitializer toOffsetsInitializer(@Nullable Map<Integer, 
Long> bucketOffsets) {
         switch (scanMode) {
-            case FULL:
+            case LATEST_FULL:
                 return bucketOffsets == null
                         ? OffsetsInitializer.earliest()
                         : 
OffsetsInitializer.offsets(toKafkaOffsets(bucketOffsets));

Reply via email to