jakob-ed commented on a change in pull request #15152: URL: https://github.com/apache/flink/pull/15152#discussion_r597332781
########## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.function.Supplier; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A source implementation to pull messages from GCP Pub/Sub into Flink. A {@link PubSubSource} can + * be constructed through the {@link PubSubSourceBuilder} like so: + * + * <pre>{@code + * PubSubSource.newBuilder() + * // The deserialization schema to deserialize Pub/Sub messages + * .withDeserializationSchema(new SimpleStringSchema()) + * // The name string of your Pub/Sub project + * .withProjectName(PROJECT_NAME) + * // The name string of the subscription you would like to receive messages from + * .withSubscriptionName(SUBSCRIPTION_NAME) + * // An instance of the com.google.auth.Credentials class to authenticate against Google Cloud + * .withCredentials(CREDENTIALS) + * .withPubSubSubscriberFactory( + * // The maximum number of messages that should be pulled in one go + * 3, + * // The timeout after which the reception of a message is deemed a failure + * Duration.ofSeconds(1), + * // The number of times the reception of a message should be retried in case of failure + * 10) + * .setProps(new Properties()) + * .build(); + * }</pre> + * + * <p>More details can be found at {@link PubSubSourceBuilder} + * + * @param <OUT> The output type of the source. + */ +public class PubSubSource<OUT> + implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, ResultTypeQueryable<OUT> { + protected final PubSubDeserializationSchema<OUT> deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Properties props; + private final Credentials credentials; + + PubSubSource( + PubSubDeserializationSchema<OUT> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Properties props, + Credentials credentials) { + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.props = props; + this.credentials = credentials; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext readerContext) { + FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + Supplier<PubSubSplitReader<OUT>> splitReaderSupplier = + () -> { + try { + return new PubSubSplitReader<>( + deserializationSchema, + pubSubSubscriberFactory.getSubscriber(credentials)); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + }; + PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>(); + + return new PubSubSourceReader<>( + elementsQueue, + splitReaderSupplier, + recordEmitter, + toConfiguration(props), + readerContext); + } + + @Override + public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> createEnumerator( + SplitEnumeratorContext<PubSubSplit> enumContext) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> restoreEnumerator( + SplitEnumeratorContext<PubSubSplit> enumContext, + PubSubEnumeratorCheckpoint checkpoint) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() { + return new PubSubSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint> + getEnumeratorCheckpointSerializer() { + return new PubSubEnumeratorCheckpointSerializer(); + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deserializationSchema.getProducedType(); + } + + /** + * Get a builder to build a {@link PubSubSource}. + * + * @return A builder for a @{link PubSubSource}. + */ + public static DeserializationSchemaBuilder newBuilder() { + return new DeserializationSchemaBuilder(); + } + + /** @param <OUT> */ + public static class PubSubSourceBuilder<OUT> Review comment: It can't be made private because some methods, like `withProjectName`, `withSubscriptionName`, have to be accessible by users. ########## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.function.Supplier; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A source implementation to pull messages from GCP Pub/Sub into Flink. A {@link PubSubSource} can + * be constructed through the {@link PubSubSourceBuilder} like so: + * + * <pre>{@code + * PubSubSource.newBuilder() + * // The deserialization schema to deserialize Pub/Sub messages + * .withDeserializationSchema(new SimpleStringSchema()) + * // The name string of your Pub/Sub project + * .withProjectName(PROJECT_NAME) + * // The name string of the subscription you would like to receive messages from + * .withSubscriptionName(SUBSCRIPTION_NAME) + * // An instance of the com.google.auth.Credentials class to authenticate against Google Cloud + * .withCredentials(CREDENTIALS) + * .withPubSubSubscriberFactory( + * // The maximum number of messages that should be pulled in one go + * 3, + * // The timeout after which the reception of a message is deemed a failure + * Duration.ofSeconds(1), + * // The number of times the reception of a message should be retried in case of failure + * 10) + * .setProps(new Properties()) + * .build(); + * }</pre> + * + * <p>More details can be found at {@link PubSubSourceBuilder} + * + * @param <OUT> The output type of the source. + */ +public class PubSubSource<OUT> + implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, ResultTypeQueryable<OUT> { + protected final PubSubDeserializationSchema<OUT> deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Properties props; + private final Credentials credentials; + + PubSubSource( + PubSubDeserializationSchema<OUT> deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Properties props, + Credentials credentials) { + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.props = props; + this.credentials = credentials; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext readerContext) { + FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + Supplier<PubSubSplitReader<OUT>> splitReaderSupplier = + () -> { + try { + return new PubSubSplitReader<>( + deserializationSchema, + pubSubSubscriberFactory.getSubscriber(credentials)); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + }; + PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>(); + + return new PubSubSourceReader<>( + elementsQueue, + splitReaderSupplier, + recordEmitter, + toConfiguration(props), + readerContext); + } + + @Override + public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> createEnumerator( + SplitEnumeratorContext<PubSubSplit> enumContext) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> restoreEnumerator( + SplitEnumeratorContext<PubSubSplit> enumContext, + PubSubEnumeratorCheckpoint checkpoint) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() { + return new PubSubSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint> + getEnumeratorCheckpointSerializer() { + return new PubSubEnumeratorCheckpointSerializer(); + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deserializationSchema.getProducedType(); + } + + /** + * Get a builder to build a {@link PubSubSource}. + * + * @return A builder for a @{link PubSubSource}. + */ + public static DeserializationSchemaBuilder newBuilder() { + return new DeserializationSchemaBuilder(); + } + + /** @param <OUT> */ + public static class PubSubSourceBuilder<OUT> + implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> { + private final PubSubDeserializationSchema<OUT> deserializationSchema; + private String projectName; + private String subscriptionName; + + private PubSubSubscriberFactory pubSubSubscriberFactory; + private Properties props; + private Credentials credentials; + + /** + * Use any {@link DeserializationSchema} to use in the {@link PubSubSource}. The schema will + * be wrapped automatically for compatibility with the source. + * + * @param deserializationSchema The deserialization schema to use. + */ + private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) { + Preconditions.checkNotNull(deserializationSchema); + this.deserializationSchema = new DeserializationSchemaWrapper<>(deserializationSchema); + } + + /** + * Use a {@link PubSubDeserializationSchema} for the {@link PubSubSource}. + * + * @param deserializationSchema The deserialization schema to use. + */ + private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> deserializationSchema) { + Preconditions.checkNotNull(deserializationSchema); + this.deserializationSchema = deserializationSchema; + } + + @Override + public SubscriptionNameBuilder<OUT> withProjectName(String projectName) { + Preconditions.checkNotNull(projectName); + this.projectName = projectName; + return this; + } + + @Override + public PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName) { + Preconditions.checkNotNull(subscriptionName); + this.subscriptionName = subscriptionName; + return this; + } + + public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials) { + this.credentials = credentials; + return this; + } + + public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory( + PubSubSubscriberFactory pubSubSubscriberFactory) { + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + return this; + } + + public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory( + int maxMessagesPerPull, Duration perRequestTimeout, int retries) { + this.pubSubSubscriberFactory = + new DefaultPubSubSubscriberFactory( + ProjectSubscriptionName.format(projectName, subscriptionName), + retries, + perRequestTimeout, + maxMessagesPerPull); + return this; + } + + public PubSubSourceBuilder setProps(Properties props) { + this.props = props; + return this; + } + + public PubSubSource<OUT> build() throws IOException { + if (credentials == null) { + credentials = defaultCredentialsProviderBuilder().build().getCredentials(); + } + + if (pubSubSubscriberFactory == null) { + pubSubSubscriberFactory = + new DefaultPubSubSubscriberFactory( + ProjectSubscriptionName.format(projectName, subscriptionName), + 3, + Duration.ofSeconds(15), + 100); + } + + return new PubSubSource( + deserializationSchema, pubSubSubscriberFactory, props, credentials); + } + } + + /** Part of {@link PubSubSourceBuilder} to set required fields. */ + public static class DeserializationSchemaBuilder { Review comment: Like before, it can't be made private because `withDeserializationSchema` has to be accessible by users. ########## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.util.Collector; + +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param <T> the type of the record. + */ +public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private final PubSubSubscriber subscriber; + private final PubSubDeserializationSchema<T> deserializationSchema; + private final PubSubCollector collector; + // Store the IDs of GCP Pub/Sub messages that yet have to be acknowledged so that they are not + // resent. + private final List<String> messageIdsToAcknowledge = new ArrayList<>(); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param subscriber a subscriber object to read messages from. + */ + public PubSubSplitReader( + PubSubDeserializationSchema deserializationSchema, PubSubSubscriber subscriber) { + + this.subscriber = subscriber; + this.deserializationSchema = deserializationSchema; + this.collector = new PubSubCollector(); + } + + @Override + public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException { + RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new RecordsBySplits.Builder<>(); + + for (ReceivedMessage receivedMessage : subscriber.pull()) { + try { + // Deserialize messages into a collector so that logic in the user-provided + // deserialization schema decides how to map GCP Pub/Sub messages to records in + // Flink. This allows e.g. batching together multiple Flink records in a single GCP + // Pub/Sub message. + deserializationSchema.deserialize(receivedMessage.getMessage(), collector); + collector + .getMessages() + .forEach( + message -> + recordsBySplits.add( + PubSubSplit.SPLIT_ID, + new Tuple2<>( + message, + // A timestamp provided by GCP Pub/Sub + // indicating when the message was initially + // published + receivedMessage + .getMessage() + .getPublishTime() + .getSeconds()))); + } catch (Exception e) { + throw new IOException("Failed to deserialize received message due to", e); + } finally { + collector.reset(); + } + + messageIdsToAcknowledge.add(receivedMessage.getAckId()); + } + + if (collector.isEndOfStreamSignalled()) { + recordsBySplits.addFinishedSplit(PubSubSplit.SPLIT_ID); + } Review comment: Nope, it doesn't, I will remove the end-of-stream logic. ########## File path: flink-connectors/flink-connector-gcp-pubsub/pom.xml ########## @@ -53,6 +53,11 @@ under the License. </dependencyManagement> <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${project.version}</version> Review comment: For a couple of imports from the connector base, e.g. https://github.com/apache/flink/pull/15152/files#diff-ba04e2277fad461b0c6fd8cca0d5db0a08cc01d1390827b2bb505feba696bd33R32 The same dependency can also be found e.g. in the POM of the Kafka connector. ########## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * A custom {@link SingleThreadFetcherManager} so that the reception of GCP Pub/Sub messages can be + * acknowledged towards GCP Pub/Sub once they have been successfully checkpointed in Flink. As long + * as a received message has not been acknowledged, GCP Pub/Sub will attempt to deliver it again. + */ +public class PubSubSourceFetcherManager<T> + extends SingleThreadFetcherManager<Tuple2<T, Long>, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSourceFetcherManager.class); + + public PubSubSourceFetcherManager( + FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, Long>>> elementsQueue, + Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> splitReaderSupplier) { + super(elementsQueue, splitReaderSupplier); + } + + /** + * Creates a {@link SplitFetcher} if there's none available yet and enqueues a task to + * acknowledge GCP Pub/Sub messages. + */ + public void acknowledgeMessages() { + SplitFetcher<Tuple2<T, Long>, PubSubSplit> splitFetcher = fetchers.get(0); + + if (splitFetcher != null) { + enqueueAcknowledgeMessageTask(splitFetcher); + } else { + splitFetcher = createSplitFetcher(); + enqueueAcknowledgeMessageTask(splitFetcher); + startFetcher(splitFetcher); + } + } + + /** + * Enqueues a task that, when run, notifies a {@link PubSubSplitReader} of a successful + * checkpoint so that GCP Pub/Sub messages received since the previous checkpoint can be + * acknowledged. + * + * @param splitFetcher the split fetcher on which the acknowledge task should be enqueued. + */ + private void enqueueAcknowledgeMessageTask( + SplitFetcher<Tuple2<T, Long>, PubSubSplit> splitFetcher) { + PubSubSplitReader<T> pubSubSplitReader = + (PubSubSplitReader<T>) splitFetcher.getSplitReader(); + + splitFetcher.enqueueTask( + new SplitFetcherTask() { + @Override + public boolean run() throws IOException { + pubSubSplitReader.notifyCheckpointComplete(); Review comment: I don't think so. Do you mean because of the `throws IOException` above? I removed it now, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
