fapaul commented on a change in pull request #15152:
URL: https://github.com/apache/flink/pull/15152#discussion_r593723616



##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in 
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub 
source:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
+  <version>1.13-SNAPSHOT</version>
+</dependency>
+```
+
+## Usage
+
+Please keep in mind that the new source can be found in package 
`org.apache.flink.streaming.connectors.gcp.pubsub.source` while the old source 
implementation can still be found in 
`org.apache.flink.streaming.connectors.gcp.pubsub` (without `source` suffix). 
The API of the new source is not much different from the old one.
+
+To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are 
acknowledged against Pub/Sub when checkpointing succeeds. If a message is not 
acknowledged within a timeout (here `Duration.ofSeconds(1)`), Pub/Sub will 
attempt redelivery. To avoid unnecessary redelivery of successfully received 
messages, the timeout after which the reception of a message is deemed a 
failure (acknowledge deadline) should always be configured (much) *higher* than 
the checkpointing interval!
+
+```java
+import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource;
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Checkpointing must be enabled for the source to work so that messages can 
be acknowledged towards Pub/Sub
+env.enableCheckpointing(1000);
+
+// Parallelism > 1 may be set
+// env.setParallelism(4);
+
+PubSubSource<String> source =
+        PubSubSource.newBuilder()
+                // The deserialization schema to deserialize Pub/Sub messages
+                .withDeserializationSchema(new SimpleStringSchema())
+                // The name string of your Pub/Sub project
+                .withProjectName(PROJECT_NAME)
+                // The name string of the subscription you would like to 
receive messages from
+                .withSubscriptionName(SUBSCRIPTION_NAME)
+                // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+                .withCredentials(CREDENTIALS)
+                .withPubSubSubscriberFactory(
+                        // The maximum number of messages that should be 
pulled in one go
+                        3,
+                        // The timeout after which the reception of a message 
is deemed a failure
+                        Duration.ofSeconds(1),
+                        // The number of times the reception of a message 
should be retried in case of failure
+                        10)
+                .setProps(new Properties())
+                .build();
+
+DataStream<String> fromPubSub =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"pubsub-source");
+```
+
+## Internals
+
+#### Split Enumerator
+
+Pub/Sub doesn't expose any partitions to consuming applications. Therefore, 
the implementation of the `PubSubSourceEnumerator` doesn't do any real work 
discovery. Instead, a static `PubSubSplit` is handed to every 
`PubSubSourceReader` which requests a source split. This static source split 
doesn't contain split-specific information like partitions/offsets because this 
information can not be obtained.
+
+#### Source Reader
+
+A `PubSubSourceReader` uses Pub/Sub's pull mechanism to read new messages from 
the Pub/Sub subscription specified by the user. In the case of parallel-running 
source readers in Flink, every source reader is passed the same source split 
from the enumerator. Because of this, all source readers use the same 
connection details and the same Pub/Sub subscription to receive messages. In 
this case, Pub/Sub automatically load-balances messages between all source 
readers which pull from the same subscription. This way, parallel processing is 
achieved in the source.
+
+#### At-least-once guarantee
+
+Pub/Sub only guarantees at-least-once message delivery. This guarantee is kept 
up by the source as well. The mechanism that is used to achieve this is that 
Pub/Sub expects a message to be acknowledged by the subscriber to signal that 
the message has been consumed successfully. Any message that has not been 
acknowledged yet will be automatically redelivered by Pub/Sub once an ack 
deadline has passed.
+
+When a new checkpoint is written...
+- all messages pulled since the previous checkpoint are acknowledged to 
Pub/Sub and...
+- are forwarded to down-stream tasks

Review comment:
       We want to forward messages constantly and only acknowledge on 
checkpoint. This way we do not stop the dataflow but also do not violate the 
consistency guarantees because eventually the messages are acknowledged or the 
pipeline fails.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can

Review comment:
       I would also expect a quick explanation about the relation between your 
Source Reader + Enumerator. 

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in 
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub 
source:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
+  <version>1.13-SNAPSHOT</version>
+</dependency>
+```
+
+## Usage
+
+Please keep in mind that the new source can be found in package 
`org.apache.flink.streaming.connectors.gcp.pubsub.source` while the old source 
implementation can still be found in 
`org.apache.flink.streaming.connectors.gcp.pubsub` (without `source` suffix). 
The API of the new source is not much different from the old one.
+
+To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are 
acknowledged against Pub/Sub when checkpointing succeeds. If a message is not 
acknowledged within a timeout (here `Duration.ofSeconds(1)`), Pub/Sub will 
attempt redelivery. To avoid unnecessary redelivery of successfully received 
messages, the timeout after which the reception of a message is deemed a 
failure (acknowledge deadline) should always be configured (much) *higher* than 
the checkpointing interval!

Review comment:
       What happens if the acknowledgement takes longer than the checkpoint?

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import org.apache.flink.util.Collector;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private final PubSubSubscriber subscriber;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubCollector collector;
+    // Store the IDs of GCP Pub/Sub messages that yet have to be acknowledged 
so that they are not
+    // resent.
+    private final List<String> messageIdsToAcknowledge = new ArrayList<>();
+
+    /**
+     * @param deserializationSchema a deserialization schema to apply to 
incoming message payloads.
+     * @param subscriber a subscriber object to read messages from.
+     */
+    public PubSubSplitReader(
+            PubSubDeserializationSchema deserializationSchema, 
PubSubSubscriber subscriber) {
+
+        this.subscriber = subscriber;
+        this.deserializationSchema = deserializationSchema;
+        this.collector = new PubSubCollector();
+    }
+
+    @Override
+    public RecordsWithSplitIds<Tuple2<T, Long>> fetch() throws IOException {
+        RecordsBySplits.Builder<Tuple2<T, Long>> recordsBySplits = new 
RecordsBySplits.Builder<>();
+
+        for (ReceivedMessage receivedMessage : subscriber.pull()) {
+            try {
+                // Deserialize messages into a collector so that logic in the 
user-provided
+                // deserialization schema decides how to map GCP Pub/Sub 
messages to records in
+                // Flink. This allows e.g. batching together multiple Flink 
records in a single GCP
+                // Pub/Sub message.
+                
deserializationSchema.deserialize(receivedMessage.getMessage(), collector);
+                collector
+                        .getMessages()
+                        .forEach(
+                                message ->
+                                        recordsBySplits.add(
+                                                PubSubSplit.SPLIT_ID,
+                                                new Tuple2<>(
+                                                        message,
+                                                        // A timestamp 
provided by GCP Pub/Sub
+                                                        // indicating when the 
message was initially
+                                                        // published
+                                                        receivedMessage
+                                                                .getMessage()
+                                                                
.getPublishTime()
+                                                                
.getSeconds())));
+            } catch (Exception e) {
+                throw new IOException("Failed to deserialize received message 
due to", e);
+            } finally {
+                collector.reset();
+            }
+
+            messageIdsToAcknowledge.add(receivedMessage.getAckId());
+        }
+
+        if (collector.isEndOfStreamSignalled()) {
+            recordsBySplits.addFinishedSplit(PubSubSplit.SPLIT_ID);
+        }

Review comment:
       Does this fully work? If not I'd propose we remove the 
`isEndOfStreamSignalled` handling completely and simplifying the deserializer. 

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in 
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub 
source:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>

Review comment:
       ```suggestion
     <artifactId>flink-connector-gcp-pubsub_${scala.binary.version}</artifactId>
   ```

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .withDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .withProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .withSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+ *         .withCredentials(CREDENTIALS)
+ *         .withPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which the reception of a message is 
deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .setProps(new Properties())
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, 
ResultTypeQueryable<OUT> {
+    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    PubSubSource(

Review comment:
       private?

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .withDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .withProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .withSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+ *         .withCredentials(CREDENTIALS)
+ *         .withPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which the reception of a message is 
deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .setProps(new Properties())
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, 
ResultTypeQueryable<OUT> {
+    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () -> {
+                    try {
+                        return new PubSubSplitReader<>(
+                                deserializationSchema,
+                                
pubSubSubscriberFactory.getSubscriber(credentials));

Review comment:
       You can lazy initialize the subscriber `PubSubSplitReader#fetch` and let 
the IOException propagate.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .withDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .withProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .withSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+ *         .withCredentials(CREDENTIALS)
+ *         .withPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which the reception of a message is 
deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .setProps(new Properties())
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, 
ResultTypeQueryable<OUT> {
+    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () -> {
+                    try {
+                        return new PubSubSplitReader<>(
+                                deserializationSchema,
+                                
pubSubSubscriberFactory.getSubscriber(credentials));
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                };
+        PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+        return new PubSubSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                toConfiguration(props),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
createEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext,
+            PubSubEnumeratorCheckpoint checkpoint) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+        return new PubSubSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint>
+            getEnumeratorCheckpointSerializer() {
+        return new PubSubEnumeratorCheckpointSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * Get a builder to build a {@link PubSubSource}.
+     *
+     * @return A builder for a @{link PubSubSource}.
+     */
+    public static DeserializationSchemaBuilder newBuilder() {
+        return new DeserializationSchemaBuilder();
+    }
+
+    /** @param <OUT> */
+    public static class PubSubSourceBuilder<OUT>
+            implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
+        private final PubSubDeserializationSchema<OUT> deserializationSchema;
+        private String projectName;
+        private String subscriptionName;
+
+        private PubSubSubscriberFactory pubSubSubscriberFactory;
+        private Properties props;
+        private Credentials credentials;
+
+        /**
+         * Use any {@link DeserializationSchema} to use in the {@link 
PubSubSource}. The schema will
+         * be wrapped automatically for compatibility with the source.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(DeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = new 
DeserializationSchemaWrapper<>(deserializationSchema);
+        }
+
+        /**
+         * Use a {@link PubSubDeserializationSchema} for the {@link 
PubSubSource}.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = deserializationSchema;
+        }
+
+        @Override
+        public SubscriptionNameBuilder<OUT> withProjectName(String 
projectName) {
+            Preconditions.checkNotNull(projectName);
+            this.projectName = projectName;
+            return this;
+        }
+
+        @Override
+        public PubSubSourceBuilder<OUT> withSubscriptionName(String 
subscriptionName) {
+            Preconditions.checkNotNull(subscriptionName);
+            this.subscriptionName = subscriptionName;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withCredentials(Credentials 
credentials) {
+            this.credentials = credentials;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                PubSubSubscriberFactory pubSubSubscriberFactory) {
+            this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                int maxMessagesPerPull, Duration perRequestTimeout, int 
retries) {
+            this.pubSubSubscriberFactory =
+                    new DefaultPubSubSubscriberFactory(
+                            ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                            retries,
+                            perRequestTimeout,
+                            maxMessagesPerPull);
+            return this;
+        }
+
+        public PubSubSourceBuilder setProps(Properties props) {
+            this.props = props;
+            return this;
+        }
+
+        public PubSubSource<OUT> build() throws IOException {
+            if (credentials == null) {
+                credentials = 
defaultCredentialsProviderBuilder().build().getCredentials();
+            }
+
+            if (pubSubSubscriberFactory == null) {
+                pubSubSubscriberFactory =
+                        new DefaultPubSubSubscriberFactory(
+                                ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                                3,
+                                Duration.ofSeconds(15),
+                                100);
+            }
+
+            return new PubSubSource(
+                    deserializationSchema, pubSubSubscriberFactory, props, 
credentials);
+        }
+    }
+
+    /** Part of {@link PubSubSourceBuilder} to set required fields. */
+    public static class DeserializationSchemaBuilder {

Review comment:
       private

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorCheckpoint.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator;
+
+/**
+ * A stub to contain the checkpoint data of a {@link PubSubSourceEnumerator}. 
GCP Pub/Sub does not
+ * expose any partitions or similar concepts which would need handling by the 
enumerator. Therefore,
+ * there are no offsets or other data that could be saved in a checkpoint.
+ */
+public class PubSubEnumeratorCheckpoint {}

Review comment:
       ```suggestion
   public class PubSubEnumeratorState {}
   ```

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .withDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .withProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .withSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+ *         .withCredentials(CREDENTIALS)
+ *         .withPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which the reception of a message is 
deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .setProps(new Properties())
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, 
ResultTypeQueryable<OUT> {
+    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () -> {
+                    try {
+                        return new PubSubSplitReader<>(
+                                deserializationSchema,
+                                
pubSubSubscriberFactory.getSubscriber(credentials));
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                };
+        PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+        return new PubSubSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                toConfiguration(props),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
createEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext,
+            PubSubEnumeratorCheckpoint checkpoint) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+        return new PubSubSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint>
+            getEnumeratorCheckpointSerializer() {
+        return new PubSubEnumeratorCheckpointSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * Get a builder to build a {@link PubSubSource}.
+     *
+     * @return A builder for a @{link PubSubSource}.
+     */
+    public static DeserializationSchemaBuilder newBuilder() {
+        return new DeserializationSchemaBuilder();
+    }
+
+    /** @param <OUT> */
+    public static class PubSubSourceBuilder<OUT>
+            implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
+        private final PubSubDeserializationSchema<OUT> deserializationSchema;
+        private String projectName;
+        private String subscriptionName;
+
+        private PubSubSubscriberFactory pubSubSubscriberFactory;
+        private Properties props;
+        private Credentials credentials;
+
+        /**
+         * Use any {@link DeserializationSchema} to use in the {@link 
PubSubSource}. The schema will
+         * be wrapped automatically for compatibility with the source.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(DeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = new 
DeserializationSchemaWrapper<>(deserializationSchema);
+        }
+
+        /**
+         * Use a {@link PubSubDeserializationSchema} for the {@link 
PubSubSource}.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = deserializationSchema;
+        }
+
+        @Override
+        public SubscriptionNameBuilder<OUT> withProjectName(String 
projectName) {
+            Preconditions.checkNotNull(projectName);
+            this.projectName = projectName;
+            return this;
+        }
+
+        @Override
+        public PubSubSourceBuilder<OUT> withSubscriptionName(String 
subscriptionName) {
+            Preconditions.checkNotNull(subscriptionName);
+            this.subscriptionName = subscriptionName;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withCredentials(Credentials 
credentials) {
+            this.credentials = credentials;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                PubSubSubscriberFactory pubSubSubscriberFactory) {
+            this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                int maxMessagesPerPull, Duration perRequestTimeout, int 
retries) {
+            this.pubSubSubscriberFactory =
+                    new DefaultPubSubSubscriberFactory(
+                            ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                            retries,
+                            perRequestTimeout,
+                            maxMessagesPerPull);
+            return this;
+        }
+
+        public PubSubSourceBuilder setProps(Properties props) {
+            this.props = props;
+            return this;
+        }
+
+        public PubSubSource<OUT> build() throws IOException {
+            if (credentials == null) {
+                credentials = 
defaultCredentialsProviderBuilder().build().getCredentials();
+            }
+
+            if (pubSubSubscriberFactory == null) {
+                pubSubSubscriberFactory =
+                        new DefaultPubSubSubscriberFactory(
+                                ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                                3,
+                                Duration.ofSeconds(15),
+                                100);
+            }
+
+            return new PubSubSource(
+                    deserializationSchema, pubSubSubscriberFactory, props, 
credentials);
+        }
+    }
+
+    /** Part of {@link PubSubSourceBuilder} to set required fields. */
+    public static class DeserializationSchemaBuilder {
+        /**
+         * Set the DeserializationSchema used to deserialize incoming 
PubSubMessages. If you want
+         * access to meta data of a PubSubMessage use the overloaded
+         * withDeserializationSchema({@link PubSubDeserializationSchema}) 
method instead.
+         */
+        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
+                DeserializationSchema<OUT> deserializationSchema) {
+            return new PubSubSourceBuilder<>(deserializationSchema);
+        }
+
+        /** Set the DeserializationSchema used to deserialize incoming 
PubSubMessages. */
+        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
+                PubSubDeserializationSchema<OUT> deserializationSchema) {
+            return new PubSubSourceBuilder<>(deserializationSchema);
+        }
+    }
+
+    /** Part of {@link PubSubSourceBuilder} to set required fields. */
+    public interface ProjectNameBuilder<OUT> {
+        /** Set the project name of the subscription to pull messages from. */
+        SubscriptionNameBuilder<OUT> withProjectName(String projectName);
+    }
+
+    /** Part of {@link PubSubSourceBuilder} to set required fields. */
+    public interface SubscriptionNameBuilder<OUT> {
+        /** Set the subscription name of the subscription to pull messages 
from. */
+        PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
+    }

Review comment:
       What is the intention behind these interfaces?

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .withDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .withProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .withSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+ *         .withCredentials(CREDENTIALS)
+ *         .withPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which the reception of a message is 
deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .setProps(new Properties())
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, 
ResultTypeQueryable<OUT> {
+    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () -> {
+                    try {
+                        return new PubSubSplitReader<>(
+                                deserializationSchema,
+                                
pubSubSubscriberFactory.getSubscriber(credentials));
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                };
+        PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+        return new PubSubSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                toConfiguration(props),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
createEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext,
+            PubSubEnumeratorCheckpoint checkpoint) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+        return new PubSubSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint>
+            getEnumeratorCheckpointSerializer() {
+        return new PubSubEnumeratorCheckpointSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * Get a builder to build a {@link PubSubSource}.
+     *
+     * @return A builder for a @{link PubSubSource}.
+     */
+    public static DeserializationSchemaBuilder newBuilder() {
+        return new DeserializationSchemaBuilder();
+    }
+
+    /** @param <OUT> */
+    public static class PubSubSourceBuilder<OUT>
+            implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
+        private final PubSubDeserializationSchema<OUT> deserializationSchema;
+        private String projectName;
+        private String subscriptionName;
+
+        private PubSubSubscriberFactory pubSubSubscriberFactory;
+        private Properties props;
+        private Credentials credentials;
+
+        /**
+         * Use any {@link DeserializationSchema} to use in the {@link 
PubSubSource}. The schema will
+         * be wrapped automatically for compatibility with the source.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(DeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = new 
DeserializationSchemaWrapper<>(deserializationSchema);
+        }
+
+        /**
+         * Use a {@link PubSubDeserializationSchema} for the {@link 
PubSubSource}.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = deserializationSchema;
+        }
+
+        @Override
+        public SubscriptionNameBuilder<OUT> withProjectName(String 
projectName) {
+            Preconditions.checkNotNull(projectName);
+            this.projectName = projectName;
+            return this;
+        }
+
+        @Override
+        public PubSubSourceBuilder<OUT> withSubscriptionName(String 
subscriptionName) {
+            Preconditions.checkNotNull(subscriptionName);
+            this.subscriptionName = subscriptionName;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withCredentials(Credentials 
credentials) {
+            this.credentials = credentials;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                PubSubSubscriberFactory pubSubSubscriberFactory) {
+            this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                int maxMessagesPerPull, Duration perRequestTimeout, int 
retries) {
+            this.pubSubSubscriberFactory =
+                    new DefaultPubSubSubscriberFactory(
+                            ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                            retries,
+                            perRequestTimeout,
+                            maxMessagesPerPull);
+            return this;
+        }
+
+        public PubSubSourceBuilder setProps(Properties props) {
+            this.props = props;
+            return this;
+        }
+
+        public PubSubSource<OUT> build() throws IOException {
+            if (credentials == null) {
+                credentials = 
defaultCredentialsProviderBuilder().build().getCredentials();
+            }
+
+            if (pubSubSubscriberFactory == null) {
+                pubSubSubscriberFactory =
+                        new DefaultPubSubSubscriberFactory(
+                                ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                                3,

Review comment:
       Use static constants for magic numbers to give a better explanation.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .withDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .withProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .withSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+ *         .withCredentials(CREDENTIALS)
+ *         .withPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which the reception of a message is 
deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .setProps(new Properties())
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, 
ResultTypeQueryable<OUT> {
+    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () -> {
+                    try {
+                        return new PubSubSplitReader<>(
+                                deserializationSchema,
+                                
pubSubSubscriberFactory.getSubscriber(credentials));
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                };
+        PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+        return new PubSubSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                toConfiguration(props),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
createEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext,
+            PubSubEnumeratorCheckpoint checkpoint) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+        return new PubSubSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint>
+            getEnumeratorCheckpointSerializer() {
+        return new PubSubEnumeratorCheckpointSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * Get a builder to build a {@link PubSubSource}.
+     *
+     * @return A builder for a @{link PubSubSource}.
+     */
+    public static DeserializationSchemaBuilder newBuilder() {
+        return new DeserializationSchemaBuilder();
+    }
+
+    /** @param <OUT> */
+    public static class PubSubSourceBuilder<OUT>

Review comment:
       private?

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .withDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .withProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .withSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+ *         .withCredentials(CREDENTIALS)
+ *         .withPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which the reception of a message is 
deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .setProps(new Properties())
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, 
ResultTypeQueryable<OUT> {
+    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () -> {
+                    try {
+                        return new PubSubSplitReader<>(
+                                deserializationSchema,
+                                
pubSubSubscriberFactory.getSubscriber(credentials));
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                };
+        PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+        return new PubSubSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                toConfiguration(props),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
createEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext,
+            PubSubEnumeratorCheckpoint checkpoint) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+        return new PubSubSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint>
+            getEnumeratorCheckpointSerializer() {
+        return new PubSubEnumeratorCheckpointSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * Get a builder to build a {@link PubSubSource}.
+     *
+     * @return A builder for a @{link PubSubSource}.
+     */
+    public static DeserializationSchemaBuilder newBuilder() {
+        return new DeserializationSchemaBuilder();
+    }
+
+    /** @param <OUT> */
+    public static class PubSubSourceBuilder<OUT>
+            implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
+        private final PubSubDeserializationSchema<OUT> deserializationSchema;
+        private String projectName;
+        private String subscriptionName;
+
+        private PubSubSubscriberFactory pubSubSubscriberFactory;
+        private Properties props;
+        private Credentials credentials;
+
+        /**
+         * Use any {@link DeserializationSchema} to use in the {@link 
PubSubSource}. The schema will
+         * be wrapped automatically for compatibility with the source.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(DeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = new 
DeserializationSchemaWrapper<>(deserializationSchema);
+        }
+
+        /**
+         * Use a {@link PubSubDeserializationSchema} for the {@link 
PubSubSource}.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = deserializationSchema;
+        }
+
+        @Override
+        public SubscriptionNameBuilder<OUT> withProjectName(String 
projectName) {
+            Preconditions.checkNotNull(projectName);
+            this.projectName = projectName;
+            return this;
+        }
+
+        @Override
+        public PubSubSourceBuilder<OUT> withSubscriptionName(String 
subscriptionName) {
+            Preconditions.checkNotNull(subscriptionName);
+            this.subscriptionName = subscriptionName;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withCredentials(Credentials 
credentials) {
+            this.credentials = credentials;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                PubSubSubscriberFactory pubSubSubscriberFactory) {
+            this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                int maxMessagesPerPull, Duration perRequestTimeout, int 
retries) {
+            this.pubSubSubscriberFactory =
+                    new DefaultPubSubSubscriberFactory(
+                            ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                            retries,
+                            perRequestTimeout,
+                            maxMessagesPerPull);
+            return this;
+        }
+
+        public PubSubSourceBuilder setProps(Properties props) {
+            this.props = props;
+            return this;
+        }
+
+        public PubSubSource<OUT> build() throws IOException {
+            if (credentials == null) {
+                credentials = 
defaultCredentialsProviderBuilder().build().getCredentials();
+            }
+
+            if (pubSubSubscriberFactory == null) {
+                pubSubSubscriberFactory =
+                        new DefaultPubSubSubscriberFactory(
+                                ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                                3,
+                                Duration.ofSeconds(15),
+                                100);
+            }
+
+            return new PubSubSource(
+                    deserializationSchema, pubSubSubscriberFactory, props, 
credentials);
+        }
+    }
+
+    /** Part of {@link PubSubSourceBuilder} to set required fields. */
+    public static class DeserializationSchemaBuilder {
+        /**
+         * Set the DeserializationSchema used to deserialize incoming 
PubSubMessages. If you want
+         * access to meta data of a PubSubMessage use the overloaded
+         * withDeserializationSchema({@link PubSubDeserializationSchema}) 
method instead.
+         */
+        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
+                DeserializationSchema<OUT> deserializationSchema) {
+            return new PubSubSourceBuilder<>(deserializationSchema);
+        }
+
+        /** Set the DeserializationSchema used to deserialize incoming 
PubSubMessages. */
+        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
+                PubSubDeserializationSchema<OUT> deserializationSchema) {
+            return new PubSubSourceBuilder<>(deserializationSchema);

Review comment:
       I do no think this is a good way to expose it to the user. IMO you have 
to define your own deserialization Schema which is facilitated in your source 
and only let users implement this one. 

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in 
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub 
source:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
+  <version>1.13-SNAPSHOT</version>
+</dependency>
+```
+
+## Usage
+
+Please keep in mind that the new source can be found in package 
`org.apache.flink.streaming.connectors.gcp.pubsub.source` while the old source 
implementation can still be found in 
`org.apache.flink.streaming.connectors.gcp.pubsub` (without `source` suffix). 
The API of the new source is not much different from the old one.

Review comment:
       I would not put this in here because eventually we want to remove the 
old connector when this is merged.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+/**
+ * A custom {@link SingleThreadFetcherManager} so that the reception of GCP 
Pub/Sub messages can be
+ * acknowledged towards GCP Pub/Sub once they have been successfully 
checkpointed in Flink. As long
+ * as a received message has not been acknowledged, GCP Pub/Sub will attempt 
to deliver it again.
+ */
+public class PubSubSourceFetcherManager<T>
+        extends SingleThreadFetcherManager<Tuple2<T, Long>, PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSourceFetcherManager.class);
+
+    public PubSubSourceFetcherManager(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, 
Long>>> elementsQueue,
+            Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> 
splitReaderSupplier) {
+        super(elementsQueue, splitReaderSupplier);
+    }
+
+    /**
+     * Creates a {@link SplitFetcher} if there's none available yet and 
enqueues a task to
+     * acknowledge GCP Pub/Sub messages.
+     */
+    public void acknowledgeMessages() {
+        SplitFetcher<Tuple2<T, Long>, PubSubSplit> splitFetcher = 
fetchers.get(0);
+
+        if (splitFetcher != null) {
+            enqueueAcknowledgeMessageTask(splitFetcher);
+        } else {
+            splitFetcher = createSplitFetcher();
+            enqueueAcknowledgeMessageTask(splitFetcher);
+            startFetcher(splitFetcher);
+        }
+    }
+
+    /**
+     * Enqueues a task that, when run, notifies a {@link PubSubSplitReader} of 
a successful
+     * checkpoint so that GCP Pub/Sub messages received since the previous 
checkpoint can be
+     * acknowledged.
+     *
+     * @param splitFetcher the split fetcher on which the acknowledge task 
should be enqueued.
+     */
+    private void enqueueAcknowledgeMessageTask(
+            SplitFetcher<Tuple2<T, Long>, PubSubSplit> splitFetcher) {
+        PubSubSplitReader<T> pubSubSplitReader =
+                (PubSubSplitReader<T>) splitFetcher.getSplitReader();
+
+        splitFetcher.enqueueTask(
+                new SplitFetcherTask() {
+                    @Override
+                    public boolean run() throws IOException {
+                        pubSubSplitReader.notifyCheckpointComplete();

Review comment:
       Can this call fail?

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import org.apache.flink.util.Collector;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private final PubSubSubscriber subscriber;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubCollector collector;
+    // Store the IDs of GCP Pub/Sub messages that yet have to be acknowledged 
so that they are not
+    // resent.
+    private final List<String> messageIdsToAcknowledge = new ArrayList<>();

Review comment:
       This has to be some sort of synchronized list. It is acessed by the 
fetcher thread and the reader thread.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorCheckpointSerializer.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/**
+ * A stub to serialize the contents of a {@link PubSubEnumeratorCheckpoint}. 
Because no data is
+ * stored in such a checkpoint, no proper serialization is necessary.
+ */
+public class PubSubEnumeratorCheckpointSerializer
+        implements SimpleVersionedSerializer<PubSubEnumeratorCheckpoint> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(PubSubEnumeratorCheckpoint enumeratorCheckpoint) 
throws IOException {
+        return new byte[0];
+    }
+
+    @Override
+    public PubSubEnumeratorCheckpoint deserialize(int version, byte[] 
serialized)

Review comment:
       To allow state evolution in the future it is important to check the 
version 
https://github.com/apache/flink/blob/bebf3b5a105dd4bc21882116570c6d71299269a6/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java#L52.
   
   A simple check is enough that the version is equal to the one we are 
expecting.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitSerializer.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source.split;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/**
+ * A stub to serialize instances of {@link PubSubSplit}. No real 
deserialization or serialization is
+ * carried out because of how generic the {@link PubSubSplit} is.
+ */
+public class PubSubSplitSerializer implements 
SimpleVersionedSerializer<PubSubSplit> {
+    private static final int CURRENT_VERSION = 0;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(PubSubSplit obj) throws IOException {
+        return new byte[0];
+    }
+
+    @Override
+    public PubSubSplit deserialize(int version, byte[] serialized) throws 
IOException {

Review comment:
       Same with the other serializer please do a version check.

##########
File path: 
flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
+import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
+import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.streaming.connectors.gcp.pubsub.SimpleStringSchemaWithStopMarkerDetection.STOP_MARKER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */
+public class EmulatedPubSubNewSourceTest extends GCloudUnitTestBase {

Review comment:
       Maybe add another test to assure the behavior on failure?

##########
File path: flink-connectors/flink-connector-gcp-pubsub/pom.xml
##########
@@ -53,6 +53,11 @@ under the License.
        </dependencyManagement>
 
        <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${project.version}</version>

Review comment:
       Why do you need this dependency?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to