This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 926f53b60 [flink] Add the compatibility check between StartupMode and 
LOG_SYSTEM (#876)
926f53b60 is described below

commit 926f53b602d30d57ac26f542aa618510bd89223d
Author: liming.1018 <[email protected]>
AuthorDate: Thu Apr 13 16:23:26 2023 +0800

    [flink] Add the compatibility check between StartupMode and LOG_SYSTEM 
(#876)
    
    * [flink] Add the compatibility check between StartupMode and LOG_SYSTEM.
    
    * change log message format
    
    ---------
    
    Co-authored-by: liming.1018 <[email protected]>
---
 .../paimon/flink/AbstractFlinkTableFactory.java    |  9 ++++
 .../flink/AbstractFlinkTableFactoryTest.java       | 53 ++++++++++++++++++++++
 .../paimon/flink/kafka/KafkaLogTestUtils.java      |  7 ++-
 3 files changed, 67 insertions(+), 2 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 73a9c0f7c..59666f4b0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -54,7 +54,10 @@ import java.util.Set;
 
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.SCAN_MODE;
 import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
+import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT;
+import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
@@ -118,6 +121,12 @@ public abstract class AbstractFlinkTableFactory
             // Use file store continuous reading
             validateFileStoreContinuous(configOptions);
             return Optional.empty();
+        } else if (configOptions.get(SCAN_MODE) == FROM_SNAPSHOT
+                || configOptions.get(SCAN_MODE) == FROM_SNAPSHOT_FULL) {
+            throw new ValidationException(
+                    String.format(
+                            "Log system does not support %s and %s scan mode",
+                            FROM_SNAPSHOT, FROM_SNAPSHOT_FULL));
         }
 
         return Optional.of(discoverLogStoreFactory(classLoader, 
configOptions.get(LOG_SYSTEM)));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
index 41958a016..b2e829dbd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
@@ -18,14 +18,31 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
+import org.apache.paimon.flink.kafka.KafkaLogTestUtils;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 
+import static org.apache.paimon.CoreOptions.LogChangelogMode;
+import static org.apache.paimon.CoreOptions.LogConsistency;
+import static org.apache.paimon.CoreOptions.SCAN_MODE;
+import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static org.apache.paimon.CoreOptions.StartupMode;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link AbstractFlinkTableFactory}. */
@@ -53,6 +70,42 @@ public class AbstractFlinkTableFactoryTest {
                 true);
     }
 
+    @ParameterizedTest
+    @EnumSource(StartupMode.class)
+    public void testCreateKafkaLogStoreFactory(StartupMode startupMode) {
+        Map<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put(FlinkConnectorOptions.LOG_SYSTEM.key(), "kafka");
+        dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
+        if (startupMode == StartupMode.FROM_SNAPSHOT
+                || startupMode == StartupMode.FROM_SNAPSHOT_FULL) {
+            dynamicOptions.put(SCAN_SNAPSHOT_ID.key(), "1");
+        } else if (startupMode == StartupMode.FROM_TIMESTAMP) {
+            dynamicOptions.put(
+                    SCAN_TIMESTAMP_MILLIS.key(), 
String.valueOf(System.currentTimeMillis()));
+        }
+        dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
+        DynamicTableFactory.Context context =
+                KafkaLogTestUtils.testContext(
+                        "table",
+                        "",
+                        LogChangelogMode.AUTO,
+                        LogConsistency.TRANSACTIONAL,
+                        RowType.of(new IntType(), new IntType()),
+                        new int[] {0},
+                        dynamicOptions);
+
+        try {
+            Optional<LogStoreTableFactory> optional =
+                    
AbstractFlinkTableFactory.createOptionalLogStoreFactory(context);
+            assertThat(startupMode)
+                    .isNotIn(StartupMode.FROM_SNAPSHOT, 
StartupMode.FROM_SNAPSHOT_FULL);
+            assertThat(optional.isPresent()).isTrue();
+            
assertThat(optional.get()).isInstanceOf(KafkaLogStoreFactory.class);
+        } catch (ValidationException e) {
+            assertThat(startupMode).isIn(StartupMode.FROM_SNAPSHOT, 
StartupMode.FROM_SNAPSHOT_FULL);
+        }
+    }
+
     private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
         assertThat(AbstractFlinkTableFactory.schemaEquals(r1, 
r2)).isEqualTo(expectEquals);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
index a1fa469a2..eb2201f15 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
@@ -178,7 +178,8 @@ public class KafkaLogTestUtils {
                 changelogMode,
                 consistency,
                 RowType.of(new IntType(), new IntType()),
-                keyed ? new int[] {0} : new int[0]);
+                keyed ? new int[] {0} : new int[0],
+                new HashMap<>());
     }
 
     public static DynamicTableFactory.Context testContext(
@@ -187,12 +188,14 @@ public class KafkaLogTestUtils {
             LogChangelogMode changelogMode,
             LogConsistency consistency,
             RowType type,
-            int[] keys) {
+            int[] keys,
+            Map<String, String> dynamicOptions) {
         Map<String, String> options = new HashMap<>();
         options.put(LOG_CHANGELOG_MODE.key(), changelogMode.toString());
         options.put(LOG_CONSISTENCY.key(), consistency.toString());
         options.put(BOOTSTRAP_SERVERS.key(), servers);
         options.put(TOPIC.key(), UUID.randomUUID().toString());
+        options.putAll(dynamicOptions);
         return createContext(name, type, keys, options);
     }
 

Reply via email to