imaffe commented on a change in pull request #18406:
URL: https://github.com/apache/flink/pull/18406#discussion_r811610572
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
##########
@@ -108,6 +125,329 @@ void createFromFlinkTypeInformation() throws Exception {
assertEquals(collector.result, "test-content");
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void primitiveStringPulsarSchema(boolean useNativePulsarSchema) {
+ final String topicName =
+ "primitiveString-" + ThreadLocalRandom.current().nextLong(0,
Long.MAX_VALUE);
+ operator().createTopic(topicName, 1);
+ String expectedMessage = randomAlphabetic(10);
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.STRING,
+ expectedMessage);
+ PulsarSource<String> source;
+ if (useNativePulsarSchema) {
+ source = createSource(topicName,
nativePulsarSchema(Schema.STRING));
+ } else {
+ source = createSource(topicName, pulsarSchema(Schema.STRING));
+ }
+
+ assertThatCode(() -> runPipeline(source,
expectedMessage)).doesNotThrowAnyException();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void unversionedJsonStructPulsarSchema(boolean useNativePulsarSchema) {
+ final String topicName =
+ "unversionedJsonStruct-" +
ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+ operator().createTopic(topicName, 1);
+ TestingUser expectedMessage = createRandomUser();
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.JSON(TestingUser.class),
+ expectedMessage);
+ PulsarSource<TestingUser> source;
+ if (useNativePulsarSchema) {
+ source =
+ createSource(
+ topicName,
+ nativePulsarSchema(Schema.JSON(TestingUser.class),
TestingUser.class));
+ } else {
+ source =
+ createSource(
+ topicName,
+ pulsarSchema(Schema.JSON(TestingUser.class),
TestingUser.class));
+ }
+ assertThatCode(() -> runPipeline(source,
expectedMessage)).doesNotThrowAnyException();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void keyValueJsonStructPulsarSchema(boolean useNativePulsarSchema) {
+ final String topicName =
+ "keyValueJsonStruct-" +
ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+ operator().createTopic(topicName, 1);
+ KeyValue<TestingUser, TestingUser> expectedMessage =
+ new KeyValue<>(createRandomUser(), createRandomUser());
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.KeyValue(
+ Schema.JSON(TestingUser.class),
Schema.JSON(TestingUser.class)),
+ expectedMessage);
+ PulsarSource<KeyValue<TestingUser, TestingUser>> source;
+ if (useNativePulsarSchema) {
+ source =
+ createSource(
+ topicName,
+ nativePulsarSchema(
+ Schema.KeyValue(
+ Schema.JSON(TestingUser.class),
+ Schema.JSON(TestingUser.class)),
+ TestingUser.class,
+ TestingUser.class));
+ } else {
+ source =
+ createSource(
+ topicName,
+ pulsarSchema(
+ Schema.KeyValue(
+ Schema.JSON(TestingUser.class),
+ Schema.JSON(TestingUser.class)),
+ TestingUser.class,
+ TestingUser.class));
+ }
+
+ assertThatCode(() -> runPipeline(source,
expectedMessage)).doesNotThrowAnyException();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void keyValueAvroStructPulsarSchema(boolean useNativePulsarSchema) {
+ final String topicName =
+ "keyValueAvroStruct-" +
ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
+ operator().createTopic(topicName, 1);
+ KeyValue<TestingUser, TestingUser> expectedMessage =
+ new KeyValue<>(createRandomUser(), createRandomUser());
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.KeyValue(
+ Schema.AVRO(TestingUser.class),
Schema.AVRO(TestingUser.class)),
+ expectedMessage);
+ PulsarSource<KeyValue<TestingUser, TestingUser>> source;
+ if (useNativePulsarSchema) {
+ source =
+ createSource(
+ topicName,
+ nativePulsarSchema(
+ Schema.KeyValue(
+ Schema.AVRO(TestingUser.class),
+ Schema.AVRO(TestingUser.class)),
+ TestingUser.class,
+ TestingUser.class));
+ } else {
+ source =
+ createSource(
+ topicName,
+ pulsarSchema(
+ Schema.KeyValue(
+ Schema.AVRO(TestingUser.class),
+ Schema.AVRO(TestingUser.class)),
+ TestingUser.class,
+ TestingUser.class));
+ }
+
+ assertThatCode(() -> runPipeline(source,
expectedMessage)).doesNotThrowAnyException();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void keyValuePrimitivePulsarSchema(boolean useNativePulsarSchema) {
+ final String topicName =
+ "keyValuePrimitive-" + ThreadLocalRandom.current().nextLong(0,
Long.MAX_VALUE);
+ operator().createTopic(topicName, 1);
+ KeyValue<String, Integer> expectedMessage = new
KeyValue<>(randomAlphabetic(5), 5);
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.KeyValue(Schema.STRING, Schema.INT32),
+ expectedMessage);
+ PulsarSource<KeyValue<String, Integer>> source;
+ if (useNativePulsarSchema) {
+ source =
+ createSource(
+ topicName,
+ nativePulsarSchema(
+ Schema.KeyValue(Schema.STRING,
Schema.INT32),
+ String.class,
+ Integer.class));
+ } else {
+ source =
+ createSource(
+ topicName,
+ pulsarSchema(
+ Schema.KeyValue(Schema.STRING,
Schema.INT32),
+ String.class,
+ Integer.class));
+ }
+
+ assertThatCode(() -> runPipeline(source,
expectedMessage)).doesNotThrowAnyException();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void keyValuePrimitiveKeyStructValuePulsarSchema(boolean
useNativePulsarSchema) {
+ final String topicName =
+ "primitiveKeyStructValue-"
+ + ThreadLocalRandom.current().nextLong(0,
Long.MAX_VALUE);
+ operator().createTopic(topicName, 1);
+ KeyValue<String, TestingUser> expectedMessage =
+ new KeyValue<>(randomAlphabetic(5), createRandomUser());
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.KeyValue(Schema.STRING,
Schema.JSON(TestingUser.class)),
+ expectedMessage);
+ PulsarSource<KeyValue<String, TestingUser>> source;
+ if (useNativePulsarSchema) {
+ source =
+ createSource(
+ topicName,
+ nativePulsarSchema(
+ Schema.KeyValue(Schema.STRING,
Schema.JSON(TestingUser.class)),
+ String.class,
+ TestingUser.class));
+ } else {
+ source =
+ createSource(
+ topicName,
+ pulsarSchema(
+ Schema.KeyValue(Schema.STRING,
Schema.JSON(TestingUser.class)),
+ String.class,
+ TestingUser.class));
+ }
+
+ assertThatCode(() -> runPipeline(source,
expectedMessage)).doesNotThrowAnyException();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void keyValueStructKeyPrimitiveValuePulsarSchema(boolean
useNativePulsarSchema) {
+ final String topicName =
+ "structKeyPrimitiveValue-"
+ + ThreadLocalRandom.current().nextLong(0,
Long.MAX_VALUE);
+ operator().createTopic(topicName, 1);
+ KeyValue<TestingUser, String> expectedMessage =
+ new KeyValue<>(createRandomUser(), randomAlphabetic(5));
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.KeyValue(Schema.JSON(TestingUser.class),
Schema.STRING),
+ expectedMessage);
+ PulsarSource<KeyValue<TestingUser, String>> source;
+ if (useNativePulsarSchema) {
+ source =
+ createSource(
+ topicName,
+ nativePulsarSchema(
+
Schema.KeyValue(Schema.JSON(TestingUser.class), Schema.STRING),
+ TestingUser.class,
+ String.class));
+ } else {
+ source =
+ createSource(
+ topicName,
+ pulsarSchema(
+
Schema.KeyValue(Schema.JSON(TestingUser.class), Schema.STRING),
+ TestingUser.class,
+ String.class));
+ }
+
+ assertThatCode(() -> runPipeline(source,
expectedMessage)).doesNotThrowAnyException();
+ }
+
+ @Test
+ void simpleFlinkSchema() {
+ final String topicName =
+ "simpleFlinkSchema-" + ThreadLocalRandom.current().nextLong(0,
Long.MAX_VALUE);
+ operator().createTopic(topicName, 1);
+ String expectedMessage = randomAlphabetic(5);
+ operator()
+ .sendMessage(
+ TopicNameUtils.topicNameWithPartition(topicName, 0),
+ Schema.STRING,
+ expectedMessage);
+ PulsarSource<String> source =
+ createSource(topicName, flinkSchema(new SimpleStringSchema()));
+ assertThatCode(() -> runPipeline(source,
expectedMessage)).doesNotThrowAnyException();
+ }
+
+ private PulsarSource createSource(
+ String topicName, PulsarDeserializationSchema<?>
deserializationSchema) {
+ return PulsarSource.builder()
+ .setDeserializationSchema(deserializationSchema)
+ .setServiceUrl(operator().serviceUrl())
+ .setAdminUrl(operator().adminUrl())
+ .setTopics(topicName)
+ .setSubscriptionType(Exclusive)
+ .setSubscriptionName(topicName + "-subscription")
+ .setBoundedStopCursor(StopCursor.latest())
+
.setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L)
+ .build();
+ }
+
+ private <T> void runPipeline(PulsarSource<T> source, T expected) throws
Exception {
+ try (CloseableIterator<T> iterator =
+ StreamExecutionEnvironment.getExecutionEnvironment()
+ .setParallelism(1)
+ .fromSource(source, WatermarkStrategy.noWatermarks(),
"testSource")
+ .executeAndCollect()) {
+ assertThat(iterator).hasNext();
+ assertThat(iterator.next()).isEqualTo(expected);
+ }
+ }
+
+ /** A test POJO class. */
Review comment:
Actually is this really a POJO ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]