AHeise commented on a change in pull request #15304: URL: https://github.com/apache/flink/pull/15304#discussion_r689294113
########## File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializerTest.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestInputView; +import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestOutputView; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.connector.pulsar.SampleMessage.TestEnum; +import org.apache.flink.connector.pulsar.SampleMessage.TestMessage; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ThreadLocalRandom; + +import static java.util.Collections.nCopies; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.pulsar.client.api.Schema.PROTOBUF_NATIVE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** Unit tests for {@link PulsarSchemaTypeSerializer}. */ +class PulsarSchemaTypeSerializerTest { + + private final PulsarSchemaTypeSerializer<TestMessage> serializer = + new PulsarSchemaTypeSerializer<>( + new PulsarSchema<>(PROTOBUF_NATIVE(TestMessage.class), TestMessage.class)); + + TestMessage message = Review comment: private final ########## File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.connector.pulsar.testutils.PulsarContainerContextFactory; +import org.apache.flink.connector.pulsar.testutils.PulsarContainerEnvironment; +import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicConsumingContext; +import org.apache.flink.connector.pulsar.testutils.cases.SingleTopicConsumingContext; +import org.apache.flink.connectors.test.common.environment.MiniClusterTestEnvironment; +import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory; +import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; +import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; +import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase; + +import org.junit.jupiter.api.Nested; + +/** Unite test class for {@link PulsarSource}. */ +@SuppressWarnings("unused") +class PulsarSourceITCase { + + @Nested + class SingleTopicConsuming extends SourceTestSuiteBase<String> { + + // Defines test environment on Flink MiniCluster + @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); + + // Defines pulsar running environment + @ExternalSystem PulsarContainerEnvironment pulsar = new PulsarContainerEnvironment(); + + // Defines a external context Factories, + // so test cases will be invoked using this external contexts. + @ExternalContextFactory + PulsarContainerContextFactory<String, SingleTopicConsumingContext> factory = + new PulsarContainerContextFactory<>(pulsar, SingleTopicConsumingContext.class); + } + + @Nested + class MultipleTopicConsuming extends SourceTestSuiteBase<String> { + + // Defines test environment on Flink MiniCluster + @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); + + // Defines pulsar running environment + @ExternalSystem PulsarContainerEnvironment pulsar = new PulsarContainerEnvironment(); + + // Defines a external context Factories, + // so test cases will be invoked using this external contexts. + @ExternalContextFactory + PulsarContainerContextFactory<String, MultipleTopicConsumingContext> factory = + new PulsarContainerContextFactory<>(pulsar, MultipleTopicConsumingContext.class); + } +} Review comment: Could be ```suggestion class PulsarSourceITCase extends SourceTestSuiteBase<String> { // Defines test environment on Flink MiniCluster @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); // Defines pulsar running environment @ExternalSystem PulsarContainerEnvironment pulsar = new PulsarContainerEnvironment(); // Defines a external context Factories, // so test cases will be invoked using this external contexts. @ExternalContextFactory PulsarContainerContextFactory<String, MultipleTopicConsumingContext> multipleTopic = new PulsarContainerContextFactory<>(pulsar, MultipleTopicConsumingContext.class); // Defines a external context Factories, // so test cases will be invoked using this external contexts. @ExternalContextFactory PulsarContainerContextFactory<String, SingleTopicConsumingContext> singleTopic = new PulsarContainerContextFactory<>(pulsar, SingleTopicConsumingContext.class); } ``` But I also like your current solution. ########## File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarContainerContext.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.testutils; + +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connectors.test.common.external.ExternalContext; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.function.Supplier; + +import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; + +/** Common test context for pulsar container based test. */ +public abstract class PulsarContainerContext<T> implements ExternalContext<T> { Review comment: Would be nice to overwrite `toString` such that display name renders properly https://prnt.sc/1pmmzwb ########## File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarContainerContextFactory.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.testutils; + +import org.apache.flink.connectors.test.common.external.ExternalContext; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Factory for creating all the test context that extends {@link PulsarContainerContext}. Test + * context class should have a constructor with {@link PulsarContainerEnvironment} arg. + */ +public class PulsarContainerContextFactory<F, T extends PulsarContainerContext<F>> + implements ExternalContext.Factory<F> { + + private final PulsarContainerEnvironment environment; + private final Class<T> contextClass; + + public PulsarContainerContextFactory( + PulsarContainerEnvironment environment, Class<T> contextClass) { Review comment: Please use `Function<PulsarContainerEnvironment, T> contextFactory` instead of contextClass, so that you can pass `new PulsarContainerContextFactory<>(pulsar, MultipleTopicConsumingContext::new);`. Then we avoid all the reflection hacks. ########## File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuitBase.java ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.testutils; + +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +/** + * The base class for the all Pulsar related test sites. It brings up: + * + * <ul> + * <li>A Zookeeper cluster. + * <li>Pulsar Broker. + * <li>A Bookkeeper cluster. + * </ul> + * + * <p>You just need to write a JUnit 5 test class and extends this suite class. All the helper + * method list below would be ready. + * + * <p>{@code PulsarSourceEnumeratorTest} would be a test example for how to use this base class. If + * you have some setup logic, such as create topic or send message, just place then in a setup + * method with annotation {@code @BeforeAll}. This setup method would not require {@code static}. + * + * @see PulsarContainerOperator for how to use the helper methods in this class. + */ +@ExtendWith(TestLoggerExtension.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class PulsarTestSuitBase { Review comment: ```suggestion public abstract class PulsarTestSuiteBase { ``` ########## File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarContainerOperator.java ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.testutils; + +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connectors.test.common.external.ExternalContext; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicName; + +import java.io.Closeable; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyThrow; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A pulsar container operator for operating pulsar instance. It's serializable for using in {@link + * ExternalContext}. + */ +public class PulsarContainerOperator implements Serializable, Closeable { + private static final long serialVersionUID = -630646912412751301L; + + public static final int DEFAULT_PARTITIONS = 10; + public static final int NUM_RECORDS_PER_PARTITION = 20; + + private String serviceUrl; + private String adminUrl; + + private transient PulsarClient client; + private transient PulsarAdmin admin; + + public PulsarContainerOperator(String serviceUrl, String adminUrl) { + this.serviceUrl = serviceUrl; + this.adminUrl = adminUrl; + initializeClients(); + } + + /** + * Create a topic with default {@link #DEFAULT_PARTITIONS} partitions and send a fixed number + * {@link #NUM_RECORDS_PER_PARTITION} of records to this topic. + */ + public void setupTopic(String topic) { + Random random = new Random(System.currentTimeMillis()); + setupTopic(topic, Schema.STRING, () -> randomAlphanumeric(10 + random.nextInt(20))); + } + + public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier) { + createTopic(topic, DEFAULT_PARTITIONS); + + // Make sure every topic partition has message. + for (int i = 0; i < DEFAULT_PARTITIONS; i++) { + String partitionName = TopicNameUtils.topicNameWithPartition(topic, i); + List<T> messages = + Stream.generate(supplier).limit(NUM_RECORDS_PER_PARTITION).collect(toList()); + + sendMessages(partitionName, schema, messages); + } + } + + public void createTopic(String topic, int numberOfPartitions) { + checkArgument(numberOfPartitions >= 0); + if (numberOfPartitions == 0) { + createNonPartitionedTopic(topic); + } else { + createPartitionedTopic(topic, numberOfPartitions); + } + } + + public void increaseTopicPartitions(String topic, int newPartitionsNum) { + sneakyAdmin(() -> admin().topics().updatePartitionedTopic(topic, newPartitionsNum)); + } + + public void deleteTopic(String topic, boolean isPartitioned) { + if (isPartitioned) { + sneakyAdmin(() -> admin().topics().deletePartitionedTopic(topic)); + } else { + sneakyAdmin(() -> admin().topics().delete(topic)); + } + } + + public List<TopicPartition> topicInfo(String topic) { + try { + return client().getPartitionsForTopic(topic).get().stream() + .map( + p -> + new TopicPartition( + topic, + TopicName.getPartitionIndex(p), + TopicRange.createFullRange())) + .collect(toList()); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + + protected List<TopicPartition> topicsInfo(Collection<String> topics) { + return topics.stream().flatMap(s -> topicInfo(s).stream()).collect(toList()); + } + + public <T> MessageId sendMessage(String topic, Schema<T> schema, T message) { + List<MessageId> messageIds = sendMessages(topic, schema, singletonList(message)); + checkArgument(messageIds.size() == 1); + + return messageIds.get(0); + } + + public <T> List<MessageId> sendMessages( + String topic, Schema<T> schema, Collection<T> messages) { + try (Producer<T> producer = client().newProducer(schema).topic(topic).create()) { + List<MessageId> messageIds = new ArrayList<>(messages.size()); + + for (T message : messages) { + MessageId messageId = producer.newMessage().value(message).send(); + messageIds.add(messageId); + } + + return messageIds; + } catch (PulsarClientException e) { + sneakyThrow(e); + return emptyList(); + } + } + + public String serviceUrl() { + return serviceUrl; + } + + public String adminUrl() { + return adminUrl; + } + + public PulsarClient client() { + return client; + } + + public PulsarAdmin admin() { + return admin; + } + + @Override + public void close() throws IOException { + if (admin != null) { + admin.close(); + } + if (client != null) { + client.close(); + } + } + + // --------------------------- Private Methods ----------------------------- + + private void createNonPartitionedTopic(String topic) { + try { + admin().lookups().lookupTopic(topic); + sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptions(topic, 0)); + } catch (PulsarAdminException e) { + sneakyAdmin(() -> admin().topics().createNonPartitionedTopic(topic)); + } + } + + private void createPartitionedTopic(String topic, int numberOfPartitions) { + try { + admin().lookups().lookupPartitionedTopic(topic); + sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptionsAsync(topic, 0)); + } catch (PulsarAdminException e) { + sneakyAdmin(() -> admin().topics().createPartitionedTopic(topic, numberOfPartitions)); + } + } + + private void initializeClients() { + if (client == null) { Review comment: Remove if checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
