This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 926f53b60 [flink] Add the compatibility check between StartupMode and
LOG_SYSTEM (#876)
926f53b60 is described below
commit 926f53b602d30d57ac26f542aa618510bd89223d
Author: liming.1018 <[email protected]>
AuthorDate: Thu Apr 13 16:23:26 2023 +0800
[flink] Add the compatibility check between StartupMode and LOG_SYSTEM
(#876)
* [flink] Add the compatibility check between StartupMode and LOG_SYSTEM.
* change log message format
---------
Co-authored-by: liming.1018 <[email protected]>
---
.../paimon/flink/AbstractFlinkTableFactory.java | 9 ++++
.../flink/AbstractFlinkTableFactoryTest.java | 53 ++++++++++++++++++++++
.../paimon/flink/kafka/KafkaLogTestUtils.java | 7 ++-
3 files changed, 67 insertions(+), 2 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 73a9c0f7c..59666f4b0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -54,7 +54,10 @@ import java.util.Set;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
+import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT;
+import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
@@ -118,6 +121,12 @@ public abstract class AbstractFlinkTableFactory
// Use file store continuous reading
validateFileStoreContinuous(configOptions);
return Optional.empty();
+ } else if (configOptions.get(SCAN_MODE) == FROM_SNAPSHOT
+ || configOptions.get(SCAN_MODE) == FROM_SNAPSHOT_FULL) {
+ throw new ValidationException(
+ String.format(
+ "Log system does not support %s and %s scan mode",
+ FROM_SNAPSHOT, FROM_SNAPSHOT_FULL));
}
return Optional.of(discoverLogStoreFactory(classLoader,
configOptions.get(LOG_SYSTEM)));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
index 41958a016..b2e829dbd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
@@ -18,14 +18,31 @@
package org.apache.paimon.flink;
+import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
+import org.apache.paimon.flink.kafka.KafkaLogTestUtils;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import static org.apache.paimon.CoreOptions.LogChangelogMode;
+import static org.apache.paimon.CoreOptions.LogConsistency;
+import static org.apache.paimon.CoreOptions.SCAN_MODE;
+import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static org.apache.paimon.CoreOptions.StartupMode;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link AbstractFlinkTableFactory}. */
@@ -53,6 +70,42 @@ public class AbstractFlinkTableFactoryTest {
true);
}
+ @ParameterizedTest
+ @EnumSource(StartupMode.class)
+ public void testCreateKafkaLogStoreFactory(StartupMode startupMode) {
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(FlinkConnectorOptions.LOG_SYSTEM.key(), "kafka");
+ dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
+ if (startupMode == StartupMode.FROM_SNAPSHOT
+ || startupMode == StartupMode.FROM_SNAPSHOT_FULL) {
+ dynamicOptions.put(SCAN_SNAPSHOT_ID.key(), "1");
+ } else if (startupMode == StartupMode.FROM_TIMESTAMP) {
+ dynamicOptions.put(
+ SCAN_TIMESTAMP_MILLIS.key(),
String.valueOf(System.currentTimeMillis()));
+ }
+ dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
+ DynamicTableFactory.Context context =
+ KafkaLogTestUtils.testContext(
+ "table",
+ "",
+ LogChangelogMode.AUTO,
+ LogConsistency.TRANSACTIONAL,
+ RowType.of(new IntType(), new IntType()),
+ new int[] {0},
+ dynamicOptions);
+
+ try {
+ Optional<LogStoreTableFactory> optional =
+
AbstractFlinkTableFactory.createOptionalLogStoreFactory(context);
+ assertThat(startupMode)
+ .isNotIn(StartupMode.FROM_SNAPSHOT,
StartupMode.FROM_SNAPSHOT_FULL);
+ assertThat(optional.isPresent()).isTrue();
+
assertThat(optional.get()).isInstanceOf(KafkaLogStoreFactory.class);
+ } catch (ValidationException e) {
+ assertThat(startupMode).isIn(StartupMode.FROM_SNAPSHOT,
StartupMode.FROM_SNAPSHOT_FULL);
+ }
+ }
+
private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
assertThat(AbstractFlinkTableFactory.schemaEquals(r1,
r2)).isEqualTo(expectEquals);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
index a1fa469a2..eb2201f15 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
@@ -178,7 +178,8 @@ public class KafkaLogTestUtils {
changelogMode,
consistency,
RowType.of(new IntType(), new IntType()),
- keyed ? new int[] {0} : new int[0]);
+ keyed ? new int[] {0} : new int[0],
+ new HashMap<>());
}
public static DynamicTableFactory.Context testContext(
@@ -187,12 +188,14 @@ public class KafkaLogTestUtils {
LogChangelogMode changelogMode,
LogConsistency consistency,
RowType type,
- int[] keys) {
+ int[] keys,
+ Map<String, String> dynamicOptions) {
Map<String, String> options = new HashMap<>();
options.put(LOG_CHANGELOG_MODE.key(), changelogMode.toString());
options.put(LOG_CONSISTENCY.key(), consistency.toString());
options.put(BOOTSTRAP_SERVERS.key(), servers);
options.put(TOPIC.key(), UUID.randomUUID().toString());
+ options.putAll(dynamicOptions);
return createContext(name, type, keys, options);
}