vahmed-hamdy commented on code in PR #2: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/2#discussion_r1627352870
########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.function.Supplier; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A source implementation to pull messages from GCP Pub/Sub into Flink. + * + * <p>The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} to every {@link + * PubSubSourceReader} that joins. The split does not contain any split-specific information because + * Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing + * partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple + * readers using same subscription. + * + * <p>A {@link PubSubSource} can be constructed through the {@link PubSubSourceBuilder} like so: + * + * <pre>{@code + * PubSubSource.<String>builder() + * // The deserialization schema to deserialize Pub/Sub messages + * .setDeserializationSchema(new SimpleStringSchema()) + * // The name string of your Pub/Sub project + * .setProjectName(PROJECT_NAME) + * // The name string of the subscription you would like to receive messages from + * .setSubscriptionName(SUBSCRIPTION_NAME) + * // An instance of com.google.auth.Credentials to authenticate against Google Cloud + * .setCredentials(CREDENTIALS) + * .setPubSubSubscriberFactory( + * // The maximum number of messages that should be pulled in one go + * 3, + * // The timeout after which a message pull request is deemed a failure + * Duration.ofSeconds(1), + * // The number of times the reception of a message should be retried in case of failure + * 10) + * .build(); + * }</pre> + * + * <p>More details can be found at {@link PubSubSourceBuilder} + * + * @param <OUT> The output type of the source. + */ +public class PubSubSource<OUT> + implements Source<OUT, PubSubSplit, PubSubEnumeratorState>, ResultTypeQueryable<OUT> { + private final PubSubDeserializationSchema<OUT> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Properties props; Review Comment: nit: rename props, is t client props, reader props,....? ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.function.Supplier; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A source implementation to pull messages from GCP Pub/Sub into Flink. + * + * <p>The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} to every {@link + * PubSubSourceReader} that joins. The split does not contain any split-specific information because + * Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing + * partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple + * readers using same subscription. + * + * <p>A {@link PubSubSource} can be constructed through the {@link PubSubSourceBuilder} like so: + * + * <pre>{@code + * PubSubSource.<String>builder() + * // The deserialization schema to deserialize Pub/Sub messages + * .setDeserializationSchema(new SimpleStringSchema()) + * // The name string of your Pub/Sub project + * .setProjectName(PROJECT_NAME) + * // The name string of the subscription you would like to receive messages from + * .setSubscriptionName(SUBSCRIPTION_NAME) + * // An instance of com.google.auth.Credentials to authenticate against Google Cloud + * .setCredentials(CREDENTIALS) + * .setPubSubSubscriberFactory( + * // The maximum number of messages that should be pulled in one go + * 3, + * // The timeout after which a message pull request is deemed a failure + * Duration.ofSeconds(1), + * // The number of times the reception of a message should be retried in case of failure + * 10) + * .build(); + * }</pre> + * + * <p>More details can be found at {@link PubSubSourceBuilder} + * + * @param <OUT> The output type of the source. + */ +public class PubSubSource<OUT> + implements Source<OUT, PubSubSplit, PubSubEnumeratorState>, ResultTypeQueryable<OUT> { + private final PubSubDeserializationSchema<OUT> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Properties props; + private final Credentials credentials; + + private PubSubSource( + PubSubDeserializationSchema<OUT> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Properties props, + Credentials credentials) { + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.props = props; + this.credentials = credentials; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext readerContext) { + FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + Supplier<PubSubSplitReader<OUT>> splitReaderSupplier = + () -> + new PubSubSplitReader<>( + deserializationSchema, pubSubSubscriberFactory, credentials); + PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>(); + + return new PubSubSourceReader<>( + elementsQueue, + splitReaderSupplier, + recordEmitter, + toConfiguration(props), + readerContext); + } + + @Override + public SplitEnumerator<PubSubSplit, PubSubEnumeratorState> createEnumerator( + SplitEnumeratorContext<PubSubSplit> enumContext) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SplitEnumerator<PubSubSplit, PubSubEnumeratorState> restoreEnumerator( + SplitEnumeratorContext<PubSubSplit> enumContext, PubSubEnumeratorState checkpoint) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() { + return new PubSubSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer<PubSubEnumeratorState> getEnumeratorCheckpointSerializer() { + return new PubSubEnumeratorStateSerializer(); + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deserializationSchema.getProducedType(); + } + + /** + * Get a builder to build a {@link PubSubSource}. + * + * @return A builder for a {@link PubSubSource}. + */ + public static <OUT> PubSubSourceBuilder<OUT> builder() { + return new PubSubSourceBuilder<>(); + } + + /** @param <OUT> */ + public static class PubSubSourceBuilder<OUT> { + private static final int DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES = 3; + private static final int DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL = 100; + + private PubSubDeserializationSchema<OUT> deserializationSchema; + private String projectName; + private String subscriptionName; + private PubSubSubscriberFactory pubSubSubscriberFactory; + private Properties props; + private Credentials credentials; + + private PubSubSourceBuilder() { + this.props = new Properties(); + } + + /** + * Set the DeserializationSchema used to deserialize incoming Pub/Sub messages. Use any + * {@link DeserializationSchema} to use in the {@link PubSubSource}. The schema will be + * wrapped automatically for compatibility with the source. + * + * @param deserializationSchema a deserialization schema to use. + */ + public PubSubSourceBuilder<OUT> setDeserializationSchema( + DeserializationSchema<OUT> deserializationSchema) { + Preconditions.checkNotNull(deserializationSchema); + this.deserializationSchema = new DeserializationSchemaWrapper<>(deserializationSchema); + return this; + } + + /** + * Set the PubSubDeserializationSchema used to deserialize incoming Pub/Sub messages. + * + * @param deserializationSchema a deserialization schema to use. + */ + public PubSubSourceBuilder<OUT> setDeserializationSchema( + PubSubDeserializationSchema<OUT> deserializationSchema) { + Preconditions.checkNotNull(deserializationSchema); + this.deserializationSchema = deserializationSchema; + return this; + } + + /** @param projectName the name string of your Pub/Sub project */ + public PubSubSourceBuilder<OUT> setProjectName(String projectName) { + Preconditions.checkNotNull(projectName); + this.projectName = projectName; + return this; + } + + /** + * @param subscriptionName the name string of the subscription you would like to receive + * messages from + */ + public PubSubSourceBuilder<OUT> setSubscriptionName(String subscriptionName) { + Preconditions.checkNotNull(subscriptionName); + this.subscriptionName = subscriptionName; + return this; + } + + /** + * @param credentials an instance of {@link com.google.auth.Credentials} to authenticate + * against Google Cloud + */ + public PubSubSourceBuilder<OUT> setCredentials(Credentials credentials) { + this.credentials = credentials; + return this; + } + + /** @param pubSubSubscriberFactory a custom factory to create Pub/Sub subscribers from */ + public PubSubSourceBuilder<OUT> setPubSubSubscriberFactory( + PubSubSubscriberFactory pubSubSubscriberFactory) { + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + return this; + } + + /** + * Create a parameterized {@link DefaultPubSubSubscriberFactory} and set it on the builder. + * + * @param maxMessagesPerPull The maximum number of messages that should be pulled in one go. + * @param perRequestTimeout The timeout per request from the subscriber + * @param retries The number of times the reception of a message should be retried in case + * of failure. + */ + public PubSubSourceBuilder<OUT> setPubSubSubscriberFactory( + int maxMessagesPerPull, Duration perRequestTimeout, int retries) { + this.pubSubSubscriberFactory = + new DefaultPubSubSubscriberFactory( + ProjectSubscriptionName.format(projectName, subscriptionName), + retries, + perRequestTimeout, + maxMessagesPerPull); + return this; + } + + public PubSubSourceBuilder<OUT> setProps(Properties props) { + this.props = props; + return this; + } + + public PubSubSource<OUT> build() throws IOException { + Preconditions.checkNotNull( + deserializationSchema, "Deserialization schema must be provided."); + Preconditions.checkNotNull( + projectName, "Google Cloud Pub/Sub projectName must be set."); + Preconditions.checkNotNull( + subscriptionName, "Google Cloud Pub/Sub subscriptionName must be set."); + + if (credentials == null) { + credentials = defaultCredentialsProviderBuilder().build().getCredentials(); + } + + if (pubSubSubscriberFactory == null) { + pubSubSubscriberFactory = + new DefaultPubSubSubscriberFactory( + ProjectSubscriptionName.format(projectName, subscriptionName), + DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES, + Duration.ofSeconds(15), Review Comment: nit: why not expose as default like the others? ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +/** + * The enumerator for the {@link PubSubSource}. It does not do any work discovery as envisioned by + * FLIP-27 because GCP Pub/Sub hides partitions and other implementation details. + */ +public class PubSubSourceEnumerator implements SplitEnumerator<PubSubSplit, PubSubEnumeratorState> { Review Comment: missing annotation, `@Internal` ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param <T> the type of the record. + */ +public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; + private final PubSubDeserializationSchema<T> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Credentials credentials; + private volatile PubSubSubscriber subscriber; + private final PubSubCollector collector; + + // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread + // processes messages and the fetcher thread acknowledges them, the thread-safe queue + // decouples them. + private final BlockingQueue<String> ackIdsQueue = + new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final Map<Long, List<String>> messageIdsToAcknowledge = new HashMap<>(); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from + * @param credentials the credentials to use for creating a new subscriber + */ + public PubSubSplitReader( + PubSubDeserializationSchema<T> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials) { + + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.collector = new PubSubCollector(); + } + + @Override + public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException { + RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new RecordsBySplits.Builder<>(); + if (subscriber == null) { + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } + } + + for (ReceivedMessage receivedMessage : subscriber.pull()) { + try { + // Deserialize messages into a collector so that logic in the user-provided + // deserialization schema decides how to map GCP Pub/Sub messages to records in + // Flink. This allows e.g. batching together multiple Flink records in a single GCP + // Pub/Sub message. + deserializationSchema.deserialize(receivedMessage.getMessage(), collector); + collector + .getMessages() + .forEach( + message -> + recordsBySplits.add( + PubSubSplit.SPLIT_ID, + new Tuple2<>( + message, + // A timestamp provided by GCP Pub/Sub + // indicating when the message was initially + // published + receivedMessage Review Comment: What is the timestamp used for, maybe we can support [orderingKeys](https://cloud.google.com/pubsub/docs/ordering) instead ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param <T> the type of the record. + */ +public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; + private final PubSubDeserializationSchema<T> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Credentials credentials; + private volatile PubSubSubscriber subscriber; + private final PubSubCollector collector; + + // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread + // processes messages and the fetcher thread acknowledges them, the thread-safe queue + // decouples them. + private final BlockingQueue<String> ackIdsQueue = + new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final Map<Long, List<String>> messageIdsToAcknowledge = new HashMap<>(); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from + * @param credentials the credentials to use for creating a new subscriber + */ + public PubSubSplitReader( + PubSubDeserializationSchema<T> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials) { + + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.collector = new PubSubCollector(); + } + + @Override + public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException { + RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new RecordsBySplits.Builder<>(); + if (subscriber == null) { + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } + } + + for (ReceivedMessage receivedMessage : subscriber.pull()) { + try { + // Deserialize messages into a collector so that logic in the user-provided + // deserialization schema decides how to map GCP Pub/Sub messages to records in + // Flink. This allows e.g. batching together multiple Flink records in a single GCP + // Pub/Sub message. + deserializationSchema.deserialize(receivedMessage.getMessage(), collector); + collector + .getMessages() + .forEach( + message -> + recordsBySplits.add( + PubSubSplit.SPLIT_ID, + new Tuple2<>( + message, + // A timestamp provided by GCP Pub/Sub + // indicating when the message was initially + // published + receivedMessage + .getMessage() + .getPublishTime() + .getSeconds()))); + } catch (Exception e) { + throw new IOException("Failed to deserialize received message due to", e); + } finally { + collector.reset(); + } + + enqueueAcknowledgementId(receivedMessage.getAckId()); + } + + return recordsBySplits.build(); + } + + /** + * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub with retries. + * + * @param ackId the ID of the message to acknowledge + */ + public void enqueueAcknowledgementId(String ackId) { Review Comment: IMO, Acking and handling checkpoint is no longer responsibility of SplitReader, we are inflating the class, if we remove deserialisation, the BlockingQueue would have Pubsubmessages with their ackIds and the `PubsubSourceReader` would be responsible for the checkpoint operations. ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** The source reader to read from GCP Pub/Sub. */ +public class PubSubSourceReader<T> + extends SingleThreadMultiplexSourceReaderBase< + Tuple2<T, Long>, T, PubSubSplit, PubSubSplitState> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSourceReader.class); + + public PubSubSourceReader( + FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, Long>>> elementsQueue, + Supplier<PubSubSplitReader<T>> splitReaderSupplier, + RecordEmitter<Tuple2<T, Long>, T, PubSubSplitState> recordEmitter, + Configuration config, + SourceReaderContext context) { + super( + elementsQueue, + new PubSubSourceFetcherManager<>(elementsQueue, splitReaderSupplier::get, config), + recordEmitter, + config, + context); + } + + @Override + protected void onSplitFinished(Map<String, PubSubSplitState> finishedSplitIds) {} + + @Override + public List<PubSubSplit> snapshotState(long checkpointId) { + ((PubSubSourceFetcherManager<T>) splitFetcherManager) + .prepareForAcknowledgement(checkpointId); + return Collections.singletonList(new PubSubSplit()); Review Comment: Why are we returning anything here? IIUC the `PubSubSplit` is no-data object? ########## flink-connector-gcp-pubsub/README.md: ########## @@ -0,0 +1,98 @@ +# Flink Source for Google Cloud Pub/Sub Review Comment: this should go to `docs` ########## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub; Review Comment: It should be enough to add to e2e tests module ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.function.Supplier; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A source implementation to pull messages from GCP Pub/Sub into Flink. + * + * <p>The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} to every {@link + * PubSubSourceReader} that joins. The split does not contain any split-specific information because + * Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing + * partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple + * readers using same subscription. + * + * <p>A {@link PubSubSource} can be constructed through the {@link PubSubSourceBuilder} like so: + * + * <pre>{@code + * PubSubSource.<String>builder() + * // The deserialization schema to deserialize Pub/Sub messages + * .setDeserializationSchema(new SimpleStringSchema()) + * // The name string of your Pub/Sub project + * .setProjectName(PROJECT_NAME) + * // The name string of the subscription you would like to receive messages from + * .setSubscriptionName(SUBSCRIPTION_NAME) + * // An instance of com.google.auth.Credentials to authenticate against Google Cloud + * .setCredentials(CREDENTIALS) + * .setPubSubSubscriberFactory( + * // The maximum number of messages that should be pulled in one go + * 3, + * // The timeout after which a message pull request is deemed a failure + * Duration.ofSeconds(1), + * // The number of times the reception of a message should be retried in case of failure + * 10) + * .build(); + * }</pre> + * + * <p>More details can be found at {@link PubSubSourceBuilder} + * + * @param <OUT> The output type of the source. + */ +public class PubSubSource<OUT> Review Comment: missing tag, `@PublicEvolving` or `@Public` ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.enumerator; + +/** + * A stub to contain the checkpoint data of a {@link PubSubSourceEnumerator}. GCP Pub/Sub does not + * expose any partitions or similar concepts which would need handling by the enumerator. Therefore, + * there are no offsets or other data that could be saved in a checkpoint. + */ +public class PubSubEnumeratorState {} Review Comment: missing annotation, please address all ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +/** + * The enumerator for the {@link PubSubSource}. It does not do any work discovery as envisioned by + * FLIP-27 because GCP Pub/Sub hides partitions and other implementation details. + */ +public class PubSubSourceEnumerator implements SplitEnumerator<PubSubSplit, PubSubEnumeratorState> { + private final SplitEnumeratorContext<PubSubSplit> context; + + public PubSubSourceEnumerator(SplitEnumeratorContext<PubSubSplit> context) { + this.context = context; + } + + @Override + public void start() {} + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {} + + @Override + public void addSplitsBack(List<PubSubSplit> splits, int subtaskId) {} + + /** + * When a new reader joins, the enumerator actively assigns it with a generic {@link + * PubSubSplit} so that it can start pulling messages. + * + * @param subtaskId the subtask ID of the new source reader. + */ + @Override + public void addReader(int subtaskId) { + context.assignSplit(new PubSubSplit(), subtaskId); Review Comment: Why is this needed? ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param <T> the type of the record. + */ +public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; + private final PubSubDeserializationSchema<T> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Credentials credentials; + private volatile PubSubSubscriber subscriber; + private final PubSubCollector collector; + + // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread + // processes messages and the fetcher thread acknowledges them, the thread-safe queue + // decouples them. + private final BlockingQueue<String> ackIdsQueue = + new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final Map<Long, List<String>> messageIdsToAcknowledge = new HashMap<>(); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from + * @param credentials the credentials to use for creating a new subscriber + */ + public PubSubSplitReader( + PubSubDeserializationSchema<T> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials) { + + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.collector = new PubSubCollector(); + } + + @Override + public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException { + RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new RecordsBySplits.Builder<>(); + if (subscriber == null) { + synchronized (this) { Review Comment: IIUC, this is not needed, The SplitFetcher used is a `SingleThreadedSplitFetcherManager`, Also I don't understand why this is lazily initialised ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param <T> the type of the record. + */ +public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; + private final PubSubDeserializationSchema<T> deserializationSchema; Review Comment: This shouldn't be here, Ideally the record emitter responsibility is to deserialise and convert the records, this inflates the split reader without a good cause - the [kafka source](https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java) for reference ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param <T> the type of the record. + */ +public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; + private final PubSubDeserializationSchema<T> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Credentials credentials; + private volatile PubSubSubscriber subscriber; + private final PubSubCollector collector; + + // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread + // processes messages and the fetcher thread acknowledges them, the thread-safe queue + // decouples them. + private final BlockingQueue<String> ackIdsQueue = + new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final Map<Long, List<String>> messageIdsToAcknowledge = new HashMap<>(); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from + * @param credentials the credentials to use for creating a new subscriber + */ + public PubSubSplitReader( + PubSubDeserializationSchema<T> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials) { + + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.collector = new PubSubCollector(); + } + + @Override + public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException { + RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new RecordsBySplits.Builder<>(); + if (subscriber == null) { + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } + } + + for (ReceivedMessage receivedMessage : subscriber.pull()) { + try { + // Deserialize messages into a collector so that logic in the user-provided + // deserialization schema decides how to map GCP Pub/Sub messages to records in + // Flink. This allows e.g. batching together multiple Flink records in a single GCP + // Pub/Sub message. + deserializationSchema.deserialize(receivedMessage.getMessage(), collector); + collector + .getMessages() + .forEach( + message -> + recordsBySplits.add( + PubSubSplit.SPLIT_ID, + new Tuple2<>( + message, + // A timestamp provided by GCP Pub/Sub + // indicating when the message was initially + // published + receivedMessage + .getMessage() + .getPublishTime() + .getSeconds()))); + } catch (Exception e) { Review Comment: Please use specific exceptions, we don't want to mask everything ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** The source reader to read from GCP Pub/Sub. */ +public class PubSubSourceReader<T> + extends SingleThreadMultiplexSourceReaderBase< + Tuple2<T, Long>, T, PubSubSplit, PubSubSplitState> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSourceReader.class); + + public PubSubSourceReader( + FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, Long>>> elementsQueue, + Supplier<PubSubSplitReader<T>> splitReaderSupplier, + RecordEmitter<Tuple2<T, Long>, T, PubSubSplitState> recordEmitter, + Configuration config, + SourceReaderContext context) { + super( Review Comment: Deprecated constructor ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.function.Supplier; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A source implementation to pull messages from GCP Pub/Sub into Flink. + * + * <p>The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} to every {@link + * PubSubSourceReader} that joins. The split does not contain any split-specific information because + * Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing + * partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple + * readers using same subscription. + * + * <p>A {@link PubSubSource} can be constructed through the {@link PubSubSourceBuilder} like so: + * + * <pre>{@code + * PubSubSource.<String>builder() + * // The deserialization schema to deserialize Pub/Sub messages + * .setDeserializationSchema(new SimpleStringSchema()) + * // The name string of your Pub/Sub project + * .setProjectName(PROJECT_NAME) + * // The name string of the subscription you would like to receive messages from + * .setSubscriptionName(SUBSCRIPTION_NAME) + * // An instance of com.google.auth.Credentials to authenticate against Google Cloud + * .setCredentials(CREDENTIALS) + * .setPubSubSubscriberFactory( + * // The maximum number of messages that should be pulled in one go + * 3, + * // The timeout after which a message pull request is deemed a failure + * Duration.ofSeconds(1), + * // The number of times the reception of a message should be retried in case of failure + * 10) + * .build(); + * }</pre> + * + * <p>More details can be found at {@link PubSubSourceBuilder} + * + * @param <OUT> The output type of the source. + */ +public class PubSubSource<OUT> + implements Source<OUT, PubSubSplit, PubSubEnumeratorState>, ResultTypeQueryable<OUT> { + private final PubSubDeserializationSchema<OUT> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Properties props; + private final Credentials credentials; + + private PubSubSource( + PubSubDeserializationSchema<OUT> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Properties props, + Credentials credentials) { + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.props = props; + this.credentials = credentials; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext readerContext) { + FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + Supplier<PubSubSplitReader<OUT>> splitReaderSupplier = + () -> + new PubSubSplitReader<>( + deserializationSchema, pubSubSubscriberFactory, credentials); + PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>(); + + return new PubSubSourceReader<>( + elementsQueue, + splitReaderSupplier, + recordEmitter, + toConfiguration(props), + readerContext); + } + + @Override + public SplitEnumerator<PubSubSplit, PubSubEnumeratorState> createEnumerator( + SplitEnumeratorContext<PubSubSplit> enumContext) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SplitEnumerator<PubSubSplit, PubSubEnumeratorState> restoreEnumerator( + SplitEnumeratorContext<PubSubSplit> enumContext, PubSubEnumeratorState checkpoint) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() { + return new PubSubSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer<PubSubEnumeratorState> getEnumeratorCheckpointSerializer() { + return new PubSubEnumeratorStateSerializer(); + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deserializationSchema.getProducedType(); + } + + /** + * Get a builder to build a {@link PubSubSource}. + * + * @return A builder for a {@link PubSubSource}. + */ + public static <OUT> PubSubSourceBuilder<OUT> builder() { + return new PubSubSourceBuilder<>(); + } + + /** @param <OUT> */ + public static class PubSubSourceBuilder<OUT> { + private static final int DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES = 3; + private static final int DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL = 100; + + private PubSubDeserializationSchema<OUT> deserializationSchema; + private String projectName; + private String subscriptionName; + private PubSubSubscriberFactory pubSubSubscriberFactory; + private Properties props; + private Credentials credentials; Review Comment: we should pass credential provider instead, to natively support dynamic credentials discovery and refreshing. ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param <T> the type of the record. + */ +public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; + private final PubSubDeserializationSchema<T> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Credentials credentials; + private volatile PubSubSubscriber subscriber; + private final PubSubCollector collector; + + // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread + // processes messages and the fetcher thread acknowledges them, the thread-safe queue + // decouples them. + private final BlockingQueue<String> ackIdsQueue = + new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final Map<Long, List<String>> messageIdsToAcknowledge = new HashMap<>(); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from + * @param credentials the credentials to use for creating a new subscriber + */ + public PubSubSplitReader( + PubSubDeserializationSchema<T> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials) { + + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.collector = new PubSubCollector(); + } + + @Override + public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException { + RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new RecordsBySplits.Builder<>(); + if (subscriber == null) { + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } + } + + for (ReceivedMessage receivedMessage : subscriber.pull()) { + try { + // Deserialize messages into a collector so that logic in the user-provided + // deserialization schema decides how to map GCP Pub/Sub messages to records in + // Flink. This allows e.g. batching together multiple Flink records in a single GCP + // Pub/Sub message. + deserializationSchema.deserialize(receivedMessage.getMessage(), collector); + collector + .getMessages() + .forEach( + message -> + recordsBySplits.add( + PubSubSplit.SPLIT_ID, + new Tuple2<>( + message, + // A timestamp provided by GCP Pub/Sub + // indicating when the message was initially + // published + receivedMessage + .getMessage() + .getPublishTime() + .getSeconds()))); + } catch (Exception e) { + throw new IOException("Failed to deserialize received message due to", e); + } finally { + collector.reset(); + } + + enqueueAcknowledgementId(receivedMessage.getAckId()); + } + + return recordsBySplits.build(); + } + + /** + * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub with retries. + * + * @param ackId the ID of the message to acknowledge + */ + public void enqueueAcknowledgementId(String ackId) { + int retryCount = 0; + + while (retryCount < RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT) { + boolean enqueued = ackIdsQueue.offer(ackId); + if (!enqueued) { + retryCount++; + try { + Thread.sleep(RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Thread interrupted while waiting to enqueue acknowledgment ID.", e); + return; + } + } else { + return; + } + } + + LOG.warn( + "Queue is full. Unable to enqueue acknowledgment ID after " + + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT + + " retries."); + } + + @Override + public void handleSplitsChanges(SplitsChange<PubSubSplit> splitsChanges) {} + + @Override + public void wakeUp() {} + + @Override + public void close() throws Exception { + if (subscriber != null) { + subscriber.close(); + } + } + + private class PubSubCollector implements Collector<T> { + private final List<T> messages = new ArrayList<>(); + + @Override + public void collect(T message) { + messages.add(message); + } + + @Override + public void close() {} + + private List<T> getMessages() { + return messages; + } + + private void reset() { + messages.clear(); + } + } + + /** + * Prepare for acknowledging messages received since the last checkpoint by draining the {@link + * #ackIdsQueue} into {@link #messageIdsToAcknowledge}. + * + * <p>Calling this method is enqueued by the {@link PubSubSourceFetcherManager} to snapshot + * state before a checkpoint. + * + * @param checkpointId the ID of the checkpoint for which to prepare for acknowledging messages + */ + public void prepareForAcknowledgement(long checkpointId) { Review Comment: Why are we acking after snapshot completion not on snapshoting? ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param <T> the type of the record. + */ +public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; + private final PubSubDeserializationSchema<T> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Credentials credentials; + private volatile PubSubSubscriber subscriber; + private final PubSubCollector collector; + + // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread + // processes messages and the fetcher thread acknowledges them, the thread-safe queue + // decouples them. + private final BlockingQueue<String> ackIdsQueue = + new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final Map<Long, List<String>> messageIdsToAcknowledge = new HashMap<>(); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from + * @param credentials the credentials to use for creating a new subscriber + */ + public PubSubSplitReader( + PubSubDeserializationSchema<T> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials) { + + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.collector = new PubSubCollector(); + } + + @Override + public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException { + RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new RecordsBySplits.Builder<>(); + if (subscriber == null) { + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } + } + + for (ReceivedMessage receivedMessage : subscriber.pull()) { + try { + // Deserialize messages into a collector so that logic in the user-provided + // deserialization schema decides how to map GCP Pub/Sub messages to records in + // Flink. This allows e.g. batching together multiple Flink records in a single GCP + // Pub/Sub message. + deserializationSchema.deserialize(receivedMessage.getMessage(), collector); + collector + .getMessages() + .forEach( + message -> + recordsBySplits.add( + PubSubSplit.SPLIT_ID, + new Tuple2<>( + message, + // A timestamp provided by GCP Pub/Sub + // indicating when the message was initially + // published + receivedMessage + .getMessage() + .getPublishTime() + .getSeconds()))); + } catch (Exception e) { + throw new IOException("Failed to deserialize received message due to", e); + } finally { + collector.reset(); + } + + enqueueAcknowledgementId(receivedMessage.getAckId()); + } + + return recordsBySplits.build(); + } + + /** + * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub with retries. + * + * @param ackId the ID of the message to acknowledge + */ + public void enqueueAcknowledgementId(String ackId) { + int retryCount = 0; + + while (retryCount < RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT) { + boolean enqueued = ackIdsQueue.offer(ackId); + if (!enqueued) { + retryCount++; + try { + Thread.sleep(RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Thread interrupted while waiting to enqueue acknowledgment ID.", e); + return; + } + } else { + return; + } + } + + LOG.warn( + "Queue is full. Unable to enqueue acknowledgment ID after " + + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT + + " retries."); + } + + @Override + public void handleSplitsChanges(SplitsChange<PubSubSplit> splitsChanges) {} + + @Override + public void wakeUp() {} + + @Override + public void close() throws Exception { + if (subscriber != null) { + subscriber.close(); + } + } + + private class PubSubCollector implements Collector<T> { + private final List<T> messages = new ArrayList<>(); + + @Override + public void collect(T message) { + messages.add(message); + } + + @Override + public void close() {} + + private List<T> getMessages() { + return messages; + } + + private void reset() { + messages.clear(); + } + } + + /** + * Prepare for acknowledging messages received since the last checkpoint by draining the {@link + * #ackIdsQueue} into {@link #messageIdsToAcknowledge}. + * + * <p>Calling this method is enqueued by the {@link PubSubSourceFetcherManager} to snapshot + * state before a checkpoint. + * + * @param checkpointId the ID of the checkpoint for which to prepare for acknowledging messages + */ + public void prepareForAcknowledgement(long checkpointId) { + List<String> ackIds = new ArrayList<>(); + ackIdsQueue.drainTo(ackIds); + messageIdsToAcknowledge.put(checkpointId, ackIds); + } + + /** + * Acknowledge the reception of messages towards GCP Pub/Sub since the last checkpoint. If a + * received message is not acknowledged before the subscription's acknowledgment timeout, GCP + * Pub/Sub will attempt to deliver it again. + * + * <p>Calling this method is enqueued by the {@link PubSubSourceFetcherManager} on checkpoint. + * + * @param checkpointId the ID of the checkpoint for which to acknowledge messages + */ + void acknowledgeMessages(long checkpointId) throws IOException { + if (subscriber == null) { + synchronized (this) { Review Comment: again, not needed, The acknowledgment Task should not be interrupted ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param <T> the type of the record. + */ +public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; + private final PubSubDeserializationSchema<T> deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Credentials credentials; + private volatile PubSubSubscriber subscriber; + private final PubSubCollector collector; + + // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread + // processes messages and the fetcher thread acknowledges them, the thread-safe queue + // decouples them. + private final BlockingQueue<String> ackIdsQueue = + new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final Map<Long, List<String>> messageIdsToAcknowledge = new HashMap<>(); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from + * @param credentials the credentials to use for creating a new subscriber + */ + public PubSubSplitReader( + PubSubDeserializationSchema<T> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials) { + + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.collector = new PubSubCollector(); + } + + @Override + public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException { + RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new RecordsBySplits.Builder<>(); + if (subscriber == null) { + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } + } + + for (ReceivedMessage receivedMessage : subscriber.pull()) { + try { + // Deserialize messages into a collector so that logic in the user-provided + // deserialization schema decides how to map GCP Pub/Sub messages to records in + // Flink. This allows e.g. batching together multiple Flink records in a single GCP + // Pub/Sub message. + deserializationSchema.deserialize(receivedMessage.getMessage(), collector); + collector + .getMessages() + .forEach( + message -> + recordsBySplits.add( + PubSubSplit.SPLIT_ID, + new Tuple2<>( + message, + // A timestamp provided by GCP Pub/Sub + // indicating when the message was initially + // published + receivedMessage + .getMessage() + .getPublishTime() + .getSeconds()))); + } catch (Exception e) { + throw new IOException("Failed to deserialize received message due to", e); + } finally { + collector.reset(); + } + + enqueueAcknowledgementId(receivedMessage.getAckId()); + } + + return recordsBySplits.build(); + } + + /** + * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub with retries. + * + * @param ackId the ID of the message to acknowledge + */ + public void enqueueAcknowledgementId(String ackId) { + int retryCount = 0; + + while (retryCount < RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT) { + boolean enqueued = ackIdsQueue.offer(ackId); + if (!enqueued) { + retryCount++; + try { + Thread.sleep(RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Thread interrupted while waiting to enqueue acknowledgment ID.", e); + return; + } + } else { + return; + } + } + + LOG.warn( + "Queue is full. Unable to enqueue acknowledgment ID after " + + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT + + " retries."); + } + + @Override + public void handleSplitsChanges(SplitsChange<PubSubSplit> splitsChanges) {} + + @Override + public void wakeUp() {} + + @Override + public void close() throws Exception { + if (subscriber != null) { + subscriber.close(); + } + } + + private class PubSubCollector implements Collector<T> { Review Comment: nit: move to end of class ########## flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java.orig: ########## @@ -0,0 +1,181 @@ +/* Review Comment: The file extension is not correct, also Maybe we rename it `EmulatedPubSubSourceV2Test` ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * A custom {@link SingleThreadFetcherManager} so that the reception of GCP Pub/Sub messages can be + * acknowledged towards GCP Pub/Sub once they have been successfully checkpointed in Flink. As long + * as a received message has not been acknowledged, GCP Pub/Sub will attempt to deliver it again. + */ +class PubSubSourceFetcherManager<T> + extends SingleThreadFetcherManager<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSourceFetcherManager.class); + + PubSubSourceFetcherManager( + FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, Long>>> elementsQueue, + Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> splitReaderSupplier, + Configuration config) { + super(elementsQueue, splitReaderSupplier, config); Review Comment: Deprecated constructor ########## flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ +public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { Review Comment: We should use [Junit5](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#tooling) ########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * A custom {@link SingleThreadFetcherManager} so that the reception of GCP Pub/Sub messages can be + * acknowledged towards GCP Pub/Sub once they have been successfully checkpointed in Flink. As long + * as a received message has not been acknowledged, GCP Pub/Sub will attempt to deliver it again. + */ +class PubSubSourceFetcherManager<T> + extends SingleThreadFetcherManager<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSourceFetcherManager.class); + + PubSubSourceFetcherManager( + FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, Long>>> elementsQueue, + Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> splitReaderSupplier, + Configuration config) { + super(elementsQueue, splitReaderSupplier, config); + } + + void prepareForAcknowledgement(long checkpointId) { + SplitFetcher<Tuple2<T, Long>, PubSubSplit> splitFetcher = fetchers.get(0); + + if (splitFetcher != null) { + enqueuePrepareForAcknowledgementTask(splitFetcher, checkpointId); + } else { + splitFetcher = createSplitFetcher(); + enqueuePrepareForAcknowledgementTask(splitFetcher, checkpointId); + startFetcher(splitFetcher); + } + } + + private void enqueuePrepareForAcknowledgementTask( Review Comment: We are calling this on `onCheckpointComplete` where we enqueue a task that would possibly not start immediately. What happens if we fail during acking. If we move the acking responsiblity to Source reader, that would be much cleaner and we could make it synchronous -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
