jakob-ed commented on a change in pull request #15152:
URL: https://github.com/apache/flink/pull/15152#discussion_r598202068



##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in 
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub 
source:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
+  <version>1.13-SNAPSHOT</version>
+</dependency>
+```
+
+## Usage
+
+Please keep in mind that the new source can be found in package 
`org.apache.flink.streaming.connectors.gcp.pubsub.source` while the old source 
implementation can still be found in 
`org.apache.flink.streaming.connectors.gcp.pubsub` (without `source` suffix). 
The API of the new source is not much different from the old one.
+
+To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are 
acknowledged against Pub/Sub when checkpointing succeeds. If a message is not 
acknowledged within a timeout (here `Duration.ofSeconds(1)`), Pub/Sub will 
attempt redelivery. To avoid unnecessary redelivery of successfully received 
messages, the timeout after which the reception of a message is deemed a 
failure (acknowledge deadline) should always be configured (much) *higher* than 
the checkpointing interval!
+
+```java
+import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource;
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Checkpointing must be enabled for the source to work so that messages can 
be acknowledged towards Pub/Sub
+env.enableCheckpointing(1000);
+
+// Parallelism > 1 may be set
+// env.setParallelism(4);
+
+PubSubSource<String> source =
+        PubSubSource.newBuilder()
+                // The deserialization schema to deserialize Pub/Sub messages
+                .withDeserializationSchema(new SimpleStringSchema())
+                // The name string of your Pub/Sub project
+                .withProjectName(PROJECT_NAME)
+                // The name string of the subscription you would like to 
receive messages from
+                .withSubscriptionName(SUBSCRIPTION_NAME)
+                // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+                .withCredentials(CREDENTIALS)
+                .withPubSubSubscriberFactory(
+                        // The maximum number of messages that should be 
pulled in one go
+                        3,
+                        // The timeout after which the reception of a message 
is deemed a failure
+                        Duration.ofSeconds(1),
+                        // The number of times the reception of a message 
should be retried in case of failure
+                        10)
+                .setProps(new Properties())
+                .build();
+
+DataStream<String> fromPubSub =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"pubsub-source");
+```
+
+## Internals
+
+#### Split Enumerator
+
+Pub/Sub doesn't expose any partitions to consuming applications. Therefore, 
the implementation of the `PubSubSourceEnumerator` doesn't do any real work 
discovery. Instead, a static `PubSubSplit` is handed to every 
`PubSubSourceReader` which requests a source split. This static source split 
doesn't contain split-specific information like partitions/offsets because this 
information can not be obtained.
+
+#### Source Reader
+
+A `PubSubSourceReader` uses Pub/Sub's pull mechanism to read new messages from 
the Pub/Sub subscription specified by the user. In the case of parallel-running 
source readers in Flink, every source reader is passed the same source split 
from the enumerator. Because of this, all source readers use the same 
connection details and the same Pub/Sub subscription to receive messages. In 
this case, Pub/Sub automatically load-balances messages between all source 
readers which pull from the same subscription. This way, parallel processing is 
achieved in the source.
+
+#### At-least-once guarantee
+
+Pub/Sub only guarantees at-least-once message delivery. This guarantee is kept 
up by the source as well. The mechanism that is used to achieve this is that 
Pub/Sub expects a message to be acknowledged by the subscriber to signal that 
the message has been consumed successfully. Any message that has not been 
acknowledged yet will be automatically redelivered by Pub/Sub once an ack 
deadline has passed.
+
+When a new checkpoint is written...
+- all messages pulled since the previous checkpoint are acknowledged to 
Pub/Sub and...
+- are forwarded to down-stream tasks

Review comment:
       Sorry, didn't rework that part of the description enough, it stems from 
when we had a worse understanding of the data flow and checkpointing 😅  So 
actually in the PubSubSource, messages are constantly forwarded to downstream 
tasks and not just after successful checkpointing. This can also be observed 
when we set the checkpointing interval very high and messages reach downstream 
tasks before the first checkpoint is triggered.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .withDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .withProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .withSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+ *         .withCredentials(CREDENTIALS)
+ *         .withPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which the reception of a message is 
deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .setProps(new Properties())
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, 
ResultTypeQueryable<OUT> {
+    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () -> {
+                    try {
+                        return new PubSubSplitReader<>(
+                                deserializationSchema,
+                                
pubSubSubscriberFactory.getSubscriber(credentials));
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                };
+        PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+        return new PubSubSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                toConfiguration(props),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
createEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext,
+            PubSubEnumeratorCheckpoint checkpoint) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+        return new PubSubSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint>
+            getEnumeratorCheckpointSerializer() {
+        return new PubSubEnumeratorCheckpointSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * Get a builder to build a {@link PubSubSource}.
+     *
+     * @return A builder for a @{link PubSubSource}.
+     */
+    public static DeserializationSchemaBuilder newBuilder() {
+        return new DeserializationSchemaBuilder();
+    }
+
+    /** @param <OUT> */
+    public static class PubSubSourceBuilder<OUT>
+            implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
+        private final PubSubDeserializationSchema<OUT> deserializationSchema;
+        private String projectName;
+        private String subscriptionName;
+
+        private PubSubSubscriberFactory pubSubSubscriberFactory;
+        private Properties props;
+        private Credentials credentials;
+
+        /**
+         * Use any {@link DeserializationSchema} to use in the {@link 
PubSubSource}. The schema will
+         * be wrapped automatically for compatibility with the source.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(DeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = new 
DeserializationSchemaWrapper<>(deserializationSchema);
+        }
+
+        /**
+         * Use a {@link PubSubDeserializationSchema} for the {@link 
PubSubSource}.
+         *
+         * @param deserializationSchema The deserialization schema to use.
+         */
+        private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> 
deserializationSchema) {
+            Preconditions.checkNotNull(deserializationSchema);
+            this.deserializationSchema = deserializationSchema;
+        }
+
+        @Override
+        public SubscriptionNameBuilder<OUT> withProjectName(String 
projectName) {
+            Preconditions.checkNotNull(projectName);
+            this.projectName = projectName;
+            return this;
+        }
+
+        @Override
+        public PubSubSourceBuilder<OUT> withSubscriptionName(String 
subscriptionName) {
+            Preconditions.checkNotNull(subscriptionName);
+            this.subscriptionName = subscriptionName;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withCredentials(Credentials 
credentials) {
+            this.credentials = credentials;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                PubSubSubscriberFactory pubSubSubscriberFactory) {
+            this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+            return this;
+        }
+
+        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+                int maxMessagesPerPull, Duration perRequestTimeout, int 
retries) {
+            this.pubSubSubscriberFactory =
+                    new DefaultPubSubSubscriberFactory(
+                            ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                            retries,
+                            perRequestTimeout,
+                            maxMessagesPerPull);
+            return this;
+        }
+
+        public PubSubSourceBuilder setProps(Properties props) {
+            this.props = props;
+            return this;
+        }
+
+        public PubSubSource<OUT> build() throws IOException {
+            if (credentials == null) {
+                credentials = 
defaultCredentialsProviderBuilder().build().getCredentials();
+            }
+
+            if (pubSubSubscriberFactory == null) {
+                pubSubSubscriberFactory =
+                        new DefaultPubSubSubscriberFactory(
+                                ProjectSubscriptionName.format(projectName, 
subscriptionName),
+                                3,
+                                Duration.ofSeconds(15),
+                                100);
+            }
+
+            return new PubSubSource(
+                    deserializationSchema, pubSubSubscriberFactory, props, 
credentials);
+        }
+    }
+
+    /** Part of {@link PubSubSourceBuilder} to set required fields. */
+    public static class DeserializationSchemaBuilder {
+        /**
+         * Set the DeserializationSchema used to deserialize incoming 
PubSubMessages. If you want
+         * access to meta data of a PubSubMessage use the overloaded
+         * withDeserializationSchema({@link PubSubDeserializationSchema}) 
method instead.
+         */
+        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
+                DeserializationSchema<OUT> deserializationSchema) {
+            return new PubSubSourceBuilder<>(deserializationSchema);
+        }
+
+        /** Set the DeserializationSchema used to deserialize incoming 
PubSubMessages. */
+        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
+                PubSubDeserializationSchema<OUT> deserializationSchema) {
+            return new PubSubSourceBuilder<>(deserializationSchema);
+        }
+    }
+
+    /** Part of {@link PubSubSourceBuilder} to set required fields. */
+    public interface ProjectNameBuilder<OUT> {
+        /** Set the project name of the subscription to pull messages from. */
+        SubscriptionNameBuilder<OUT> withProjectName(String projectName);
+    }
+
+    /** Part of {@link PubSubSourceBuilder} to set required fields. */
+    public interface SubscriptionNameBuilder<OUT> {
+        /** Set the subscription name of the subscription to pull messages 
from. */
+        PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
+    }

Review comment:
       I refactored the builder now a bit. You are right, there was no point in 
the interfaces.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in 
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub 
source:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
+  <version>1.13-SNAPSHOT</version>
+</dependency>
+```
+
+## Usage
+
+Please keep in mind that the new source can be found in package 
`org.apache.flink.streaming.connectors.gcp.pubsub.source` while the old source 
implementation can still be found in 
`org.apache.flink.streaming.connectors.gcp.pubsub` (without `source` suffix). 
The API of the new source is not much different from the old one.
+
+To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are 
acknowledged against Pub/Sub when checkpointing succeeds. If a message is not 
acknowledged within a timeout (here `Duration.ofSeconds(1)`), Pub/Sub will 
attempt redelivery. To avoid unnecessary redelivery of successfully received 
messages, the timeout after which the reception of a message is deemed a 
failure (acknowledge deadline) should always be configured (much) *higher* than 
the checkpointing interval!

Review comment:
       What do you mean exactly? If Pub/Sub takes a bit longer to receive the 
acknowledge request, it could be that the next checkpoint gets already 
triggered before previous messages are acknowledged. But that shouldn't have an 
effect on consistency guarantees. If the acknowledgement to Pub/Sub fails or 
Pub/Sub registers that the ack deadline has been surpassed, there would be 
redelivery of messages but the at-least-once guarantee would still hold.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A 
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .withDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .withProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive 
messages from
+ *         .withSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of the com.google.auth.Credentials class to 
authenticate against Google Cloud
+ *         .withCredentials(CREDENTIALS)
+ *         .withPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in 
one go
+ *                 3,
+ *                 // The timeout after which the reception of a message is 
deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be 
retried in case of failure
+ *                 10)
+ *         .setProps(new Properties())
+ *         .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+        implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>, 
ResultTypeQueryable<OUT> {
+    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+    private final Properties props;
+    private final Credentials credentials;
+
+    PubSubSource(
+            PubSubDeserializationSchema<OUT> deserializationSchema,
+            PubSubSubscriberFactory pubSubSubscriberFactory,
+            Properties props,
+            Credentials credentials) {
+        this.deserializationSchema = deserializationSchema;
+        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+        this.props = props;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext 
readerContext) {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+                () -> {
+                    try {
+                        return new PubSubSplitReader<>(
+                                deserializationSchema,
+                                
pubSubSubscriberFactory.getSubscriber(credentials));
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                };
+        PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+        return new PubSubSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                toConfiguration(props),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
createEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<PubSubSplit> enumContext,
+            PubSubEnumeratorCheckpoint checkpoint) {
+        return new PubSubSourceEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+        return new PubSubSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint>
+            getEnumeratorCheckpointSerializer() {
+        return new PubSubEnumeratorCheckpointSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * Get a builder to build a {@link PubSubSource}.
+     *
+     * @return A builder for a @{link PubSubSource}.
+     */
+    public static DeserializationSchemaBuilder newBuilder() {
+        return new DeserializationSchemaBuilder();
+    }
+
+    /** @param <OUT> */
+    public static class PubSubSourceBuilder<OUT>

Review comment:
       Refactored the builder now and as far as I can see it will have to stay 
public.

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in 
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub 
source:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
+  <version>1.13-SNAPSHOT</version>
+</dependency>
+```
+
+## Usage
+
+Please keep in mind that the new source can be found in package 
`org.apache.flink.streaming.connectors.gcp.pubsub.source` while the old source 
implementation can still be found in 
`org.apache.flink.streaming.connectors.gcp.pubsub` (without `source` suffix). 
The API of the new source is not much different from the old one.

Review comment:
       Agree, removed it

##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import org.apache.flink.util.Collector;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, 
PubSubSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSplitReader.class);
+    private final PubSubSubscriber subscriber;
+    private final PubSubDeserializationSchema<T> deserializationSchema;
+    private final PubSubCollector collector;
+    // Store the IDs of GCP Pub/Sub messages that yet have to be acknowledged 
so that they are not
+    // resent.
+    private final List<String> messageIdsToAcknowledge = new ArrayList<>();

Review comment:
       Good catch, thank you. I discovered that there was even a bit of a 
bigger problem: Because the list stores IDs of messages received "under 
multiple checkpoints" and because of concurrency, it was possible to 
acknowledge messages that haven't been checkpointed yet. However, in the new 
solution there's still some race condition that leads to message loss in the 
case of failure and recovery from checkpoint.

##########
File path: 
flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
+import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
+import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.streaming.connectors.gcp.pubsub.SimpleStringSchemaWithStopMarkerDetection.STOP_MARKER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */
+public class EmulatedPubSubNewSourceTest extends GCloudUnitTestBase {

Review comment:
       Added a test now but the results don't look good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to