vahmed-hamdy commented on code in PR #2:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/2#discussion_r1627352870


##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink.
+ *
+ * <p>The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} 
to every {@link
+ * PubSubSourceReader} that joins. The split does not contain any 
split-specific information because
+ * Pub/Sub does not allow subscribers to specify a "range" of messages to pull 
by providing
+ * partitions or offsets. However, Pub/Sub will automatically load-balance 
messages between multiple
+ * readers using same subscription.
+ *
+ * <p>A {@link PubSubSource} can be constructed through the {@link 
PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.<String>builder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .setDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .setProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .setSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of com.google.auth.Credentials to authenticate 
against Google Cloud
+ *         .setCredentials(CREDENTIALS)
+ *         .setPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which a message pull request is deemed 
a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorState>, 
ResultTypeQueryable<OUT> {
+    private final PubSubDeserializationSchema<OUT> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;

Review Comment:
   nit: rename props, is t client props, reader props,....?



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink.
+ *
+ * <p>The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} 
to every {@link
+ * PubSubSourceReader} that joins. The split does not contain any 
split-specific information because
+ * Pub/Sub does not allow subscribers to specify a "range" of messages to pull 
by providing
+ * partitions or offsets. However, Pub/Sub will automatically load-balance 
messages between multiple
+ * readers using same subscription.
+ *
+ * <p>A {@link PubSubSource} can be constructed through the {@link 
PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.<String>builder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .setDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .setProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .setSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of com.google.auth.Credentials to authenticate 
against Google Cloud
+ *         .setCredentials(CREDENTIALS)
+ *         .setPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which a message pull request is deemed 
a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorState>, 
ResultTypeQueryable<OUT> {
+    private final PubSubDeserializationSchema<OUT> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    private PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () ->
+                        new PubSubSplitReader<>(
+                                deserializationSchema, 
pubSubSubscriberFactory, credentials);
+        PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+        return new PubSubSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                toConfiguration(props),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorState> 
createEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorState> 
restoreEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext, 
PubSubEnumeratorState checkpoint) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+        return new PubSubSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubEnumeratorState> 
getEnumeratorCheckpointSerializer() {
+        return new PubSubEnumeratorStateSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * Get a builder to build a {@link PubSubSource}.
+     *
+     * @return A builder for a {@link PubSubSource}.
+     */
+    public static <OUT> PubSubSourceBuilder<OUT> builder() {
+        return new PubSubSourceBuilder<>();
+    }
+
+    /** @param <OUT> */
+    public static class PubSubSourceBuilder<OUT> {
+        private static final int DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES = 
3;
+        private static final int 
DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL = 100;
+
+        private PubSubDeserializationSchema<OUT> deserializationSchema;
+        private String projectName;
+        private String subscriptionName;
+        private PubSubSubscriberFactory pubSubSubscriberFactory;
+        private Properties props;
+        private Credentials credentials;
+
+        private PubSubSourceBuilder() {
+            this.props = new Properties();
+        }
+
+        /**
+         * Set the DeserializationSchema used to deserialize incoming Pub/Sub 
messages. Use any
+         * {@link DeserializationSchema} to use in the {@link PubSubSource}. 
The schema will be
+         * wrapped automatically for compatibility with the source.
+         *
+         * @param deserializationSchema a deserialization schema to use.
+         */
+        public PubSubSourceBuilder<OUT> setDeserializationSchema(
+                DeserializationSchema<OUT> deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = new 
DeserializationSchemaWrapper<>(deserializationSchema);
+            return this;
+        }
+
+        /**
+         * Set the PubSubDeserializationSchema used to deserialize incoming 
Pub/Sub messages.
+         *
+         * @param deserializationSchema a deserialization schema to use.
+         */
+        public PubSubSourceBuilder<OUT> setDeserializationSchema(
+                PubSubDeserializationSchema<OUT> deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = deserializationSchema;
+            return this;
+        }
+
+        /** @param projectName the name string of your Pub/Sub project */
+        public PubSubSourceBuilder<OUT> setProjectName(String projectName) {
+            Preconditions.checkNotNull(projectName);
+            this.projectName = projectName;
+            return this;
+        }
+
+        /**
+         * @param subscriptionName the name string of the subscription you 
would like to receive
+         *     messages from
+         */
+        public PubSubSourceBuilder<OUT> setSubscriptionName(String 
subscriptionName) {
+            Preconditions.checkNotNull(subscriptionName);
+            this.subscriptionName = subscriptionName;
+            return this;
+        }
+
+        /**
+         * @param credentials an instance of {@link 
com.google.auth.Credentials} to authenticate
+         *     against Google Cloud
+         */
+        public PubSubSourceBuilder<OUT> setCredentials(Credentials 
credentials) {
+            this.credentials = credentials;
+            return this;
+        }
+
+        /** @param pubSubSubscriberFactory a custom factory to create Pub/Sub 
subscribers from */
+        public PubSubSourceBuilder<OUT> setPubSubSubscriberFactory(
+                PubSubSubscriberFactory pubSubSubscriberFactory) {
+            this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+            return this;
+        }
+
+        /**
+         * Create a parameterized {@link DefaultPubSubSubscriberFactory} and 
set it on the builder.
+         *
+         * @param maxMessagesPerPull The maximum number of messages that 
should be pulled in one go.
+         * @param perRequestTimeout The timeout per request from the subscriber
+         * @param retries The number of times the reception of a message 
should be retried in case
+         *     of failure.
+         */
+        public PubSubSourceBuilder<OUT> setPubSubSubscriberFactory(
+                int maxMessagesPerPull, Duration perRequestTimeout, int 
retries) {
+            this.pubSubSubscriberFactory =
+                    new DefaultPubSubSubscriberFactory(
+                            ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                            retries,
+                            perRequestTimeout,
+                            maxMessagesPerPull);
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> setProps(Properties props) {
+            this.props = props;
+            return this;
+        }
+
+        public PubSubSource<OUT> build() throws IOException {
+            Preconditions.checkNotNull(
+                    deserializationSchema, "Deserialization schema must be 
provided.");
+            Preconditions.checkNotNull(
+                    projectName, "Google Cloud Pub/Sub projectName must be 
set.");
+            Preconditions.checkNotNull(
+                    subscriptionName, "Google Cloud Pub/Sub subscriptionName 
must be set.");
+
+            if (credentials == null) {
+                credentials = 
defaultCredentialsProviderBuilder().build().getCredentials();
+            }
+
+            if (pubSubSubscriberFactory == null) {
+                pubSubSubscriberFactory =
+                        new DefaultPubSubSubscriberFactory(
+                                ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                                DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES,
+                                Duration.ofSeconds(15),

Review Comment:
   nit: why not expose as default like the others?



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.gcp.pubsub.source.PubSubSource;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The enumerator for the {@link PubSubSource}. It does not do any work 
discovery as envisioned by
+ * FLIP-27 because GCP Pub/Sub hides partitions and other implementation 
details.
+ */
+public class PubSubSourceEnumerator implements SplitEnumerator<PubSubSplit, 
PubSubEnumeratorState> {

Review Comment:
   missing annotation, `@Internal`



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Collector;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5;
+    private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000;
+    private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Credentials credentials;
+    private volatile PubSubSubscriber subscriber;
+    private final PubSubCollector collector;
+
+    // Store the IDs of GCP Pub/Sub messages we have fetched & processed. 
Since the reader thread
+    // processes messages and the fetcher thread acknowledges them, the 
thread-safe queue
+    // decouples them.
+    private final BlockingQueue<String> ackIdsQueue =
+            new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY);
+    private final Map<Long, List<String>> messageIdsToAcknowledge = new 
HashMap<>();
+
+    /**
+     * @param deserializationSchema a deserialization schema to apply to 
incoming message payloads.
+     * @param pubSubSubscriberFactory a factory from which a new subscriber 
can be created from
+     * @param credentials the credentials to use for creating a new subscriber
+     */
+    public PubSubSplitReader(
+            PubSubDeserializationSchema<T> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Credentials credentials) {
+
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.credentials = credentials;
+        this.collector = new PubSubCollector();
+    }
+
+    @Override
+    public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException {
+        RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new 
RecordsBySplits.Builder<>();
+        if (subscriber == null) {
+            synchronized (this) {
+                if (subscriber == null) {
+                    subscriber = 
pubSubSubscriberFactory.getSubscriber(credentials);
+                }
+            }
+        }
+
+        for (ReceivedMessage receivedMessage : subscriber.pull()) {
+            try {
+                // Deserialize messages into a collector so that logic in the 
user-provided
+                // deserialization schema decides how to map GCP Pub/Sub 
messages to records in
+                // Flink. This allows e.g. batching together multiple Flink 
records in a single GCP
+                // Pub/Sub message.
+                
deserializationSchema.deserialize(receivedMessage.getMessage(), collector);
+                collector
+                        .getMessages()
+                        .forEach(
+                                message ->
+                                        recordsBySplits.add(
+                                                PubSubSplit.SPLIT_ID,
+                                                new Tuple2<>(
+                                                        message,
+                                                        // A timestamp 
provided by GCP Pub/Sub
+                                                        // indicating when the 
message was initially
+                                                        // published
+                                                        receivedMessage

Review Comment:
   What is the timestamp used for, maybe we can support 
[orderingKeys](https://cloud.google.com/pubsub/docs/ordering) instead 



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Collector;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5;
+    private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000;
+    private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Credentials credentials;
+    private volatile PubSubSubscriber subscriber;
+    private final PubSubCollector collector;
+
+    // Store the IDs of GCP Pub/Sub messages we have fetched & processed. 
Since the reader thread
+    // processes messages and the fetcher thread acknowledges them, the 
thread-safe queue
+    // decouples them.
+    private final BlockingQueue<String> ackIdsQueue =
+            new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY);
+    private final Map<Long, List<String>> messageIdsToAcknowledge = new 
HashMap<>();
+
+    /**
+     * @param deserializationSchema a deserialization schema to apply to 
incoming message payloads.
+     * @param pubSubSubscriberFactory a factory from which a new subscriber 
can be created from
+     * @param credentials the credentials to use for creating a new subscriber
+     */
+    public PubSubSplitReader(
+            PubSubDeserializationSchema<T> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Credentials credentials) {
+
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.credentials = credentials;
+        this.collector = new PubSubCollector();
+    }
+
+    @Override
+    public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException {
+        RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new 
RecordsBySplits.Builder<>();
+        if (subscriber == null) {
+            synchronized (this) {
+                if (subscriber == null) {
+                    subscriber = 
pubSubSubscriberFactory.getSubscriber(credentials);
+                }
+            }
+        }
+
+        for (ReceivedMessage receivedMessage : subscriber.pull()) {
+            try {
+                // Deserialize messages into a collector so that logic in the 
user-provided
+                // deserialization schema decides how to map GCP Pub/Sub 
messages to records in
+                // Flink. This allows e.g. batching together multiple Flink 
records in a single GCP
+                // Pub/Sub message.
+                
deserializationSchema.deserialize(receivedMessage.getMessage(), collector);
+                collector
+                        .getMessages()
+                        .forEach(
+                                message ->
+                                        recordsBySplits.add(
+                                                PubSubSplit.SPLIT_ID,
+                                                new Tuple2<>(
+                                                        message,
+                                                        // A timestamp 
provided by GCP Pub/Sub
+                                                        // indicating when the 
message was initially
+                                                        // published
+                                                        receivedMessage
+                                                                .getMessage()
+                                                                
.getPublishTime()
+                                                                
.getSeconds())));
+            } catch (Exception e) {
+                throw new IOException("Failed to deserialize received message 
due to", e);
+            } finally {
+                collector.reset();
+            }
+
+            enqueueAcknowledgementId(receivedMessage.getAckId());
+        }
+
+        return recordsBySplits.build();
+    }
+
+    /**
+     * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub 
with retries.
+     *
+     * @param ackId the ID of the message to acknowledge
+     */
+    public void enqueueAcknowledgementId(String ackId) {

Review Comment:
   IMO, Acking and handling checkpoint is no longer responsibility of 
SplitReader, we are inflating the class, if we remove deserialisation, the 
BlockingQueue would have Pubsubmessages with their ackIds and the 
`PubsubSourceReader` would be responsible for the checkpoint operations.



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/** The source reader to read from GCP Pub/Sub. */
+public class PubSubSourceReader<T>
+        extends SingleThreadMultiplexSourceReaderBase<
+                Tuple2<T, Long>, T, PubSubSplit, PubSubSplitState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSourceReader.class);
+
+    public PubSubSourceReader(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, 
Long>>> elementsQueue,
+            Supplier<PubSubSplitReader<T>> splitReaderSupplier,
+            RecordEmitter<Tuple2<T, Long>, T, PubSubSplitState> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(
+                elementsQueue,
+                new PubSubSourceFetcherManager<>(elementsQueue, 
splitReaderSupplier::get, config),
+                recordEmitter,
+                config,
+                context);
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, PubSubSplitState> 
finishedSplitIds) {}
+
+    @Override
+    public List<PubSubSplit> snapshotState(long checkpointId) {
+        ((PubSubSourceFetcherManager<T>) splitFetcherManager)
+                .prepareForAcknowledgement(checkpointId);
+        return Collections.singletonList(new PubSubSplit());

Review Comment:
   Why are we returning anything here? IIUC the `PubSubSplit` is no-data object?



##########
flink-connector-gcp-pubsub/README.md:
##########
@@ -0,0 +1,98 @@
+# Flink Source for Google Cloud Pub/Sub

Review Comment:
   this should go to `docs`



##########
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub;

Review Comment:
   It should be enough to add to e2e tests module



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink.
+ *
+ * <p>The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} 
to every {@link
+ * PubSubSourceReader} that joins. The split does not contain any 
split-specific information because
+ * Pub/Sub does not allow subscribers to specify a "range" of messages to pull 
by providing
+ * partitions or offsets. However, Pub/Sub will automatically load-balance 
messages between multiple
+ * readers using same subscription.
+ *
+ * <p>A {@link PubSubSource} can be constructed through the {@link 
PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.<String>builder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .setDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .setProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .setSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of com.google.auth.Credentials to authenticate 
against Google Cloud
+ *         .setCredentials(CREDENTIALS)
+ *         .setPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which a message pull request is deemed 
a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>

Review Comment:
   missing tag, `@PublicEvolving` or `@Public`



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.enumerator;
+
+/**
+ * A stub to contain the checkpoint data of a {@link PubSubSourceEnumerator}. 
GCP Pub/Sub does not
+ * expose any partitions or similar concepts which would need handling by the 
enumerator. Therefore,
+ * there are no offsets or other data that could be saved in a checkpoint.
+ */
+public class PubSubEnumeratorState {}

Review Comment:
   missing annotation, please address all



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.gcp.pubsub.source.PubSubSource;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The enumerator for the {@link PubSubSource}. It does not do any work 
discovery as envisioned by
+ * FLIP-27 because GCP Pub/Sub hides partitions and other implementation 
details.
+ */
+public class PubSubSourceEnumerator implements SplitEnumerator<PubSubSplit, 
PubSubEnumeratorState> {
+    private final SplitEnumeratorContext<PubSubSplit> context;
+
+    public PubSubSourceEnumerator(SplitEnumeratorContext<PubSubSplit> context) 
{
+        this.context = context;
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {}
+
+    @Override
+    public void addSplitsBack(List<PubSubSplit> splits, int subtaskId) {}
+
+    /**
+     * When a new reader joins, the enumerator actively assigns it with a 
generic {@link
+     * PubSubSplit} so that it can start pulling messages.
+     *
+     * @param subtaskId the subtask ID of the new source reader.
+     */
+    @Override
+    public void addReader(int subtaskId) {
+        context.assignSplit(new PubSubSplit(), subtaskId);

Review Comment:
   Why is this needed?



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Collector;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5;
+    private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000;
+    private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Credentials credentials;
+    private volatile PubSubSubscriber subscriber;
+    private final PubSubCollector collector;
+
+    // Store the IDs of GCP Pub/Sub messages we have fetched & processed. 
Since the reader thread
+    // processes messages and the fetcher thread acknowledges them, the 
thread-safe queue
+    // decouples them.
+    private final BlockingQueue<String> ackIdsQueue =
+            new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY);
+    private final Map<Long, List<String>> messageIdsToAcknowledge = new 
HashMap<>();
+
+    /**
+     * @param deserializationSchema a deserialization schema to apply to 
incoming message payloads.
+     * @param pubSubSubscriberFactory a factory from which a new subscriber 
can be created from
+     * @param credentials the credentials to use for creating a new subscriber
+     */
+    public PubSubSplitReader(
+            PubSubDeserializationSchema<T> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Credentials credentials) {
+
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.credentials = credentials;
+        this.collector = new PubSubCollector();
+    }
+
+    @Override
+    public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException {
+        RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new 
RecordsBySplits.Builder<>();
+        if (subscriber == null) {
+            synchronized (this) {

Review Comment:
   IIUC, this is not needed, The SplitFetcher used is a 
`SingleThreadedSplitFetcherManager`, Also I don't understand why this is lazily 
initialised



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Collector;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5;
+    private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000;
+    private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000;
+    private final PubSubDeserializationSchema<T> deserializationSchema;

Review Comment:
   This shouldn't be here, Ideally the record emitter responsibility is to 
deserialise and convert the records, this inflates the split reader without a 
good cause
   
   - the [kafka 
source](https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java)
 for reference



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Collector;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5;
+    private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000;
+    private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Credentials credentials;
+    private volatile PubSubSubscriber subscriber;
+    private final PubSubCollector collector;
+
+    // Store the IDs of GCP Pub/Sub messages we have fetched & processed. 
Since the reader thread
+    // processes messages and the fetcher thread acknowledges them, the 
thread-safe queue
+    // decouples them.
+    private final BlockingQueue<String> ackIdsQueue =
+            new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY);
+    private final Map<Long, List<String>> messageIdsToAcknowledge = new 
HashMap<>();
+
+    /**
+     * @param deserializationSchema a deserialization schema to apply to 
incoming message payloads.
+     * @param pubSubSubscriberFactory a factory from which a new subscriber 
can be created from
+     * @param credentials the credentials to use for creating a new subscriber
+     */
+    public PubSubSplitReader(
+            PubSubDeserializationSchema<T> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Credentials credentials) {
+
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.credentials = credentials;
+        this.collector = new PubSubCollector();
+    }
+
+    @Override
+    public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException {
+        RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new 
RecordsBySplits.Builder<>();
+        if (subscriber == null) {
+            synchronized (this) {
+                if (subscriber == null) {
+                    subscriber = 
pubSubSubscriberFactory.getSubscriber(credentials);
+                }
+            }
+        }
+
+        for (ReceivedMessage receivedMessage : subscriber.pull()) {
+            try {
+                // Deserialize messages into a collector so that logic in the 
user-provided
+                // deserialization schema decides how to map GCP Pub/Sub 
messages to records in
+                // Flink. This allows e.g. batching together multiple Flink 
records in a single GCP
+                // Pub/Sub message.
+                
deserializationSchema.deserialize(receivedMessage.getMessage(), collector);
+                collector
+                        .getMessages()
+                        .forEach(
+                                message ->
+                                        recordsBySplits.add(
+                                                PubSubSplit.SPLIT_ID,
+                                                new Tuple2<>(
+                                                        message,
+                                                        // A timestamp 
provided by GCP Pub/Sub
+                                                        // indicating when the 
message was initially
+                                                        // published
+                                                        receivedMessage
+                                                                .getMessage()
+                                                                
.getPublishTime()
+                                                                
.getSeconds())));
+            } catch (Exception e) {

Review Comment:
   Please use specific exceptions, we don't want to mask everything



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/** The source reader to read from GCP Pub/Sub. */
+public class PubSubSourceReader<T>
+        extends SingleThreadMultiplexSourceReaderBase<
+                Tuple2<T, Long>, T, PubSubSplit, PubSubSplitState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSourceReader.class);
+
+    public PubSubSourceReader(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, 
Long>>> elementsQueue,
+            Supplier<PubSubSplitReader<T>> splitReaderSupplier,
+            RecordEmitter<Tuple2<T, Long>, T, PubSubSplitState> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(

Review Comment:
   Deprecated constructor



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink.
+ *
+ * <p>The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} 
to every {@link
+ * PubSubSourceReader} that joins. The split does not contain any 
split-specific information because
+ * Pub/Sub does not allow subscribers to specify a "range" of messages to pull 
by providing
+ * partitions or offsets. However, Pub/Sub will automatically load-balance 
messages between multiple
+ * readers using same subscription.
+ *
+ * <p>A {@link PubSubSource} can be constructed through the {@link 
PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.<String>builder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .setDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .setProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .setSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of com.google.auth.Credentials to authenticate 
against Google Cloud
+ *         .setCredentials(CREDENTIALS)
+ *         .setPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which a message pull request is deemed 
a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorState>, 
ResultTypeQueryable<OUT> {
+    private final PubSubDeserializationSchema<OUT> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    private PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () ->
+                        new PubSubSplitReader<>(
+                                deserializationSchema, 
pubSubSubscriberFactory, credentials);
+        PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+        return new PubSubSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                toConfiguration(props),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorState> 
createEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorState> 
restoreEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext, 
PubSubEnumeratorState checkpoint) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+        return new PubSubSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubEnumeratorState> 
getEnumeratorCheckpointSerializer() {
+        return new PubSubEnumeratorStateSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * Get a builder to build a {@link PubSubSource}.
+     *
+     * @return A builder for a {@link PubSubSource}.
+     */
+    public static <OUT> PubSubSourceBuilder<OUT> builder() {
+        return new PubSubSourceBuilder<>();
+    }
+
+    /** @param <OUT> */
+    public static class PubSubSourceBuilder<OUT> {
+        private static final int DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES = 
3;
+        private static final int 
DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL = 100;
+
+        private PubSubDeserializationSchema<OUT> deserializationSchema;
+        private String projectName;
+        private String subscriptionName;
+        private PubSubSubscriberFactory pubSubSubscriberFactory;
+        private Properties props;
+        private Credentials credentials;

Review Comment:
   we should pass credential provider instead, to natively support dynamic 
credentials discovery and refreshing.



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Collector;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5;
+    private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000;
+    private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Credentials credentials;
+    private volatile PubSubSubscriber subscriber;
+    private final PubSubCollector collector;
+
+    // Store the IDs of GCP Pub/Sub messages we have fetched & processed. 
Since the reader thread
+    // processes messages and the fetcher thread acknowledges them, the 
thread-safe queue
+    // decouples them.
+    private final BlockingQueue<String> ackIdsQueue =
+            new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY);
+    private final Map<Long, List<String>> messageIdsToAcknowledge = new 
HashMap<>();
+
+    /**
+     * @param deserializationSchema a deserialization schema to apply to 
incoming message payloads.
+     * @param pubSubSubscriberFactory a factory from which a new subscriber 
can be created from
+     * @param credentials the credentials to use for creating a new subscriber
+     */
+    public PubSubSplitReader(
+            PubSubDeserializationSchema<T> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Credentials credentials) {
+
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.credentials = credentials;
+        this.collector = new PubSubCollector();
+    }
+
+    @Override
+    public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException {
+        RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new 
RecordsBySplits.Builder<>();
+        if (subscriber == null) {
+            synchronized (this) {
+                if (subscriber == null) {
+                    subscriber = 
pubSubSubscriberFactory.getSubscriber(credentials);
+                }
+            }
+        }
+
+        for (ReceivedMessage receivedMessage : subscriber.pull()) {
+            try {
+                // Deserialize messages into a collector so that logic in the 
user-provided
+                // deserialization schema decides how to map GCP Pub/Sub 
messages to records in
+                // Flink. This allows e.g. batching together multiple Flink 
records in a single GCP
+                // Pub/Sub message.
+                
deserializationSchema.deserialize(receivedMessage.getMessage(), collector);
+                collector
+                        .getMessages()
+                        .forEach(
+                                message ->
+                                        recordsBySplits.add(
+                                                PubSubSplit.SPLIT_ID,
+                                                new Tuple2<>(
+                                                        message,
+                                                        // A timestamp 
provided by GCP Pub/Sub
+                                                        // indicating when the 
message was initially
+                                                        // published
+                                                        receivedMessage
+                                                                .getMessage()
+                                                                
.getPublishTime()
+                                                                
.getSeconds())));
+            } catch (Exception e) {
+                throw new IOException("Failed to deserialize received message 
due to", e);
+            } finally {
+                collector.reset();
+            }
+
+            enqueueAcknowledgementId(receivedMessage.getAckId());
+        }
+
+        return recordsBySplits.build();
+    }
+
+    /**
+     * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub 
with retries.
+     *
+     * @param ackId the ID of the message to acknowledge
+     */
+    public void enqueueAcknowledgementId(String ackId) {
+        int retryCount = 0;
+
+        while (retryCount < RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT) {
+            boolean enqueued = ackIdsQueue.offer(ackId);
+            if (!enqueued) {
+                retryCount++;
+                try {
+                    Thread.sleep(RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    LOG.error("Thread interrupted while waiting to enqueue 
acknowledgment ID.", e);
+                    return;
+                }
+            } else {
+                return;
+            }
+        }
+
+        LOG.warn(
+                "Queue is full. Unable to enqueue acknowledgment ID after "
+                        + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT
+                        + " retries.");
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<PubSubSplit> splitsChanges) {}
+
+    @Override
+    public void wakeUp() {}
+
+    @Override
+    public void close() throws Exception {
+        if (subscriber != null) {
+            subscriber.close();
+        }
+    }
+
+    private class PubSubCollector implements Collector<T> {
+        private final List<T> messages = new ArrayList<>();
+
+        @Override
+        public void collect(T message) {
+            messages.add(message);
+        }
+
+        @Override
+        public void close() {}
+
+        private List<T> getMessages() {
+            return messages;
+        }
+
+        private void reset() {
+            messages.clear();
+        }
+    }
+
+    /**
+     * Prepare for acknowledging messages received since the last checkpoint 
by draining the {@link
+     * #ackIdsQueue} into {@link #messageIdsToAcknowledge}.
+     *
+     * <p>Calling this method is enqueued by the {@link 
PubSubSourceFetcherManager} to snapshot
+     * state before a checkpoint.
+     *
+     * @param checkpointId the ID of the checkpoint for which to prepare for 
acknowledging messages
+     */
+    public void prepareForAcknowledgement(long checkpointId) {

Review Comment:
   Why are we acking after snapshot completion not on snapshoting?



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Collector;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5;
+    private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000;
+    private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Credentials credentials;
+    private volatile PubSubSubscriber subscriber;
+    private final PubSubCollector collector;
+
+    // Store the IDs of GCP Pub/Sub messages we have fetched & processed. 
Since the reader thread
+    // processes messages and the fetcher thread acknowledges them, the 
thread-safe queue
+    // decouples them.
+    private final BlockingQueue<String> ackIdsQueue =
+            new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY);
+    private final Map<Long, List<String>> messageIdsToAcknowledge = new 
HashMap<>();
+
+    /**
+     * @param deserializationSchema a deserialization schema to apply to 
incoming message payloads.
+     * @param pubSubSubscriberFactory a factory from which a new subscriber 
can be created from
+     * @param credentials the credentials to use for creating a new subscriber
+     */
+    public PubSubSplitReader(
+            PubSubDeserializationSchema<T> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Credentials credentials) {
+
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.credentials = credentials;
+        this.collector = new PubSubCollector();
+    }
+
+    @Override
+    public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException {
+        RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new 
RecordsBySplits.Builder<>();
+        if (subscriber == null) {
+            synchronized (this) {
+                if (subscriber == null) {
+                    subscriber = 
pubSubSubscriberFactory.getSubscriber(credentials);
+                }
+            }
+        }
+
+        for (ReceivedMessage receivedMessage : subscriber.pull()) {
+            try {
+                // Deserialize messages into a collector so that logic in the 
user-provided
+                // deserialization schema decides how to map GCP Pub/Sub 
messages to records in
+                // Flink. This allows e.g. batching together multiple Flink 
records in a single GCP
+                // Pub/Sub message.
+                
deserializationSchema.deserialize(receivedMessage.getMessage(), collector);
+                collector
+                        .getMessages()
+                        .forEach(
+                                message ->
+                                        recordsBySplits.add(
+                                                PubSubSplit.SPLIT_ID,
+                                                new Tuple2<>(
+                                                        message,
+                                                        // A timestamp 
provided by GCP Pub/Sub
+                                                        // indicating when the 
message was initially
+                                                        // published
+                                                        receivedMessage
+                                                                .getMessage()
+                                                                
.getPublishTime()
+                                                                
.getSeconds())));
+            } catch (Exception e) {
+                throw new IOException("Failed to deserialize received message 
due to", e);
+            } finally {
+                collector.reset();
+            }
+
+            enqueueAcknowledgementId(receivedMessage.getAckId());
+        }
+
+        return recordsBySplits.build();
+    }
+
+    /**
+     * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub 
with retries.
+     *
+     * @param ackId the ID of the message to acknowledge
+     */
+    public void enqueueAcknowledgementId(String ackId) {
+        int retryCount = 0;
+
+        while (retryCount < RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT) {
+            boolean enqueued = ackIdsQueue.offer(ackId);
+            if (!enqueued) {
+                retryCount++;
+                try {
+                    Thread.sleep(RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    LOG.error("Thread interrupted while waiting to enqueue 
acknowledgment ID.", e);
+                    return;
+                }
+            } else {
+                return;
+            }
+        }
+
+        LOG.warn(
+                "Queue is full. Unable to enqueue acknowledgment ID after "
+                        + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT
+                        + " retries.");
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<PubSubSplit> splitsChanges) {}
+
+    @Override
+    public void wakeUp() {}
+
+    @Override
+    public void close() throws Exception {
+        if (subscriber != null) {
+            subscriber.close();
+        }
+    }
+
+    private class PubSubCollector implements Collector<T> {
+        private final List<T> messages = new ArrayList<>();
+
+        @Override
+        public void collect(T message) {
+            messages.add(message);
+        }
+
+        @Override
+        public void close() {}
+
+        private List<T> getMessages() {
+            return messages;
+        }
+
+        private void reset() {
+            messages.clear();
+        }
+    }
+
+    /**
+     * Prepare for acknowledging messages received since the last checkpoint 
by draining the {@link
+     * #ackIdsQueue} into {@link #messageIdsToAcknowledge}.
+     *
+     * <p>Calling this method is enqueued by the {@link 
PubSubSourceFetcherManager} to snapshot
+     * state before a checkpoint.
+     *
+     * @param checkpointId the ID of the checkpoint for which to prepare for 
acknowledging messages
+     */
+    public void prepareForAcknowledgement(long checkpointId) {
+        List<String> ackIds = new ArrayList<>();
+        ackIdsQueue.drainTo(ackIds);
+        messageIdsToAcknowledge.put(checkpointId, ackIds);
+    }
+
+    /**
+     * Acknowledge the reception of messages towards GCP Pub/Sub since the 
last checkpoint. If a
+     * received message is not acknowledged before the subscription's 
acknowledgment timeout, GCP
+     * Pub/Sub will attempt to deliver it again.
+     *
+     * <p>Calling this method is enqueued by the {@link 
PubSubSourceFetcherManager} on checkpoint.
+     *
+     * @param checkpointId the ID of the checkpoint for which to acknowledge 
messages
+     */
+    void acknowledgeMessages(long checkpointId) throws IOException {
+        if (subscriber == null) {
+            synchronized (this) {

Review Comment:
   again, not needed, The acknowledgment Task should not be interrupted



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Collector;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5;
+    private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000;
+    private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Credentials credentials;
+    private volatile PubSubSubscriber subscriber;
+    private final PubSubCollector collector;
+
+    // Store the IDs of GCP Pub/Sub messages we have fetched & processed. 
Since the reader thread
+    // processes messages and the fetcher thread acknowledges them, the 
thread-safe queue
+    // decouples them.
+    private final BlockingQueue<String> ackIdsQueue =
+            new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY);
+    private final Map<Long, List<String>> messageIdsToAcknowledge = new 
HashMap<>();
+
+    /**
+     * @param deserializationSchema a deserialization schema to apply to 
incoming message payloads.
+     * @param pubSubSubscriberFactory a factory from which a new subscriber 
can be created from
+     * @param credentials the credentials to use for creating a new subscriber
+     */
+    public PubSubSplitReader(
+            PubSubDeserializationSchema<T> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Credentials credentials) {
+
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.credentials = credentials;
+        this.collector = new PubSubCollector();
+    }
+
+    @Override
+    public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException {
+        RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new 
RecordsBySplits.Builder<>();
+        if (subscriber == null) {
+            synchronized (this) {
+                if (subscriber == null) {
+                    subscriber = 
pubSubSubscriberFactory.getSubscriber(credentials);
+                }
+            }
+        }
+
+        for (ReceivedMessage receivedMessage : subscriber.pull()) {
+            try {
+                // Deserialize messages into a collector so that logic in the 
user-provided
+                // deserialization schema decides how to map GCP Pub/Sub 
messages to records in
+                // Flink. This allows e.g. batching together multiple Flink 
records in a single GCP
+                // Pub/Sub message.
+                
deserializationSchema.deserialize(receivedMessage.getMessage(), collector);
+                collector
+                        .getMessages()
+                        .forEach(
+                                message ->
+                                        recordsBySplits.add(
+                                                PubSubSplit.SPLIT_ID,
+                                                new Tuple2<>(
+                                                        message,
+                                                        // A timestamp 
provided by GCP Pub/Sub
+                                                        // indicating when the 
message was initially
+                                                        // published
+                                                        receivedMessage
+                                                                .getMessage()
+                                                                
.getPublishTime()
+                                                                
.getSeconds())));
+            } catch (Exception e) {
+                throw new IOException("Failed to deserialize received message 
due to", e);
+            } finally {
+                collector.reset();
+            }
+
+            enqueueAcknowledgementId(receivedMessage.getAckId());
+        }
+
+        return recordsBySplits.build();
+    }
+
+    /**
+     * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub 
with retries.
+     *
+     * @param ackId the ID of the message to acknowledge
+     */
+    public void enqueueAcknowledgementId(String ackId) {
+        int retryCount = 0;
+
+        while (retryCount < RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT) {
+            boolean enqueued = ackIdsQueue.offer(ackId);
+            if (!enqueued) {
+                retryCount++;
+                try {
+                    Thread.sleep(RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    LOG.error("Thread interrupted while waiting to enqueue 
acknowledgment ID.", e);
+                    return;
+                }
+            } else {
+                return;
+            }
+        }
+
+        LOG.warn(
+                "Queue is full. Unable to enqueue acknowledgment ID after "
+                        + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT
+                        + " retries.");
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<PubSubSplit> splitsChanges) {}
+
+    @Override
+    public void wakeUp() {}
+
+    @Override
+    public void close() throws Exception {
+        if (subscriber != null) {
+            subscriber.close();
+        }
+    }
+
+    private class PubSubCollector implements Collector<T> {

Review Comment:
   nit: move to end of class



##########
flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java.orig:
##########
@@ -0,0 +1,181 @@
+/*

Review Comment:
   The file extension is not correct, also Maybe we rename it 
`EmulatedPubSubSourceV2Test`



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+/**
+ * A custom {@link SingleThreadFetcherManager} so that the reception of GCP 
Pub/Sub messages can be
+ * acknowledged towards GCP Pub/Sub once they have been successfully 
checkpointed in Flink. As long
+ * as a received message has not been acknowledged, GCP Pub/Sub will attempt 
to deliver it again.
+ */
+class PubSubSourceFetcherManager<T>
+        extends SingleThreadFetcherManager<Tuple2<T, Long>, PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSourceFetcherManager.class);
+
+    PubSubSourceFetcherManager(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, 
Long>>> elementsQueue,
+            Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> 
splitReaderSupplier,
+            Configuration config) {
+        super(elementsQueue, splitReaderSupplier, config);

Review Comment:
   Deprecated constructor



##########
flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.gcp.pubsub.source.PubSubSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
+import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */
+public class EmulatedPubSubSourceTest extends GCloudUnitTestBase {

Review Comment:
   We should use 
[Junit5](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#tooling)
 
   



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+/**
+ * A custom {@link SingleThreadFetcherManager} so that the reception of GCP 
Pub/Sub messages can be
+ * acknowledged towards GCP Pub/Sub once they have been successfully 
checkpointed in Flink. As long
+ * as a received message has not been acknowledged, GCP Pub/Sub will attempt 
to deliver it again.
+ */
+class PubSubSourceFetcherManager<T>
+        extends SingleThreadFetcherManager<Tuple2<T, Long>, PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSourceFetcherManager.class);
+
+    PubSubSourceFetcherManager(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, 
Long>>> elementsQueue,
+            Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> 
splitReaderSupplier,
+            Configuration config) {
+        super(elementsQueue, splitReaderSupplier, config);
+    }
+
+    void prepareForAcknowledgement(long checkpointId) {
+        SplitFetcher<Tuple2<T, Long>, PubSubSplit> splitFetcher = 
fetchers.get(0);
+
+        if (splitFetcher != null) {
+            enqueuePrepareForAcknowledgementTask(splitFetcher, checkpointId);
+        } else {
+            splitFetcher = createSplitFetcher();
+            enqueuePrepareForAcknowledgementTask(splitFetcher, checkpointId);
+            startFetcher(splitFetcher);
+        }
+    }
+
+    private void enqueuePrepareForAcknowledgementTask(

Review Comment:
   We are calling this on `onCheckpointComplete` where we enqueue a task that 
would possibly not start immediately. What happens if we fail during acking. 
   If we move the acking responsiblity to Source reader,  that would be much 
cleaner and we could make it synchronous 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to