jakob-ed commented on a change in pull request #15152:
URL: https://github.com/apache/flink/pull/15152#discussion_r598202068
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub
source:
+
+```xml
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
+ <version>1.13-SNAPSHOT</version>
+</dependency>
+```
+
+## Usage
+
+Please keep in mind that the new source can be found in package
`org.apache.flink.streaming.connectors.gcp.pubsub.source` while the old source
implementation can still be found in
`org.apache.flink.streaming.connectors.gcp.pubsub` (without `source` suffix).
The API of the new source is not much different from the old one.
+
+To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are
acknowledged against Pub/Sub when checkpointing succeeds. If a message is not
acknowledged within a timeout (here `Duration.ofSeconds(1)`), Pub/Sub will
attempt redelivery. To avoid unnecessary redelivery of successfully received
messages, the timeout after which the reception of a message is deemed a
failure (acknowledge deadline) should always be configured (much) *higher* than
the checkpointing interval!
+
+```java
+import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource;
+
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Checkpointing must be enabled for the source to work so that messages can
be acknowledged towards Pub/Sub
+env.enableCheckpointing(1000);
+
+// Parallelism > 1 may be set
+// env.setParallelism(4);
+
+PubSubSource<String> source =
+ PubSubSource.newBuilder()
+ // The deserialization schema to deserialize Pub/Sub messages
+ .withDeserializationSchema(new SimpleStringSchema())
+ // The name string of your Pub/Sub project
+ .withProjectName(PROJECT_NAME)
+ // The name string of the subscription you would like to
receive messages from
+ .withSubscriptionName(SUBSCRIPTION_NAME)
+ // An instance of the com.google.auth.Credentials class to
authenticate against Google Cloud
+ .withCredentials(CREDENTIALS)
+ .withPubSubSubscriberFactory(
+ // The maximum number of messages that should be
pulled in one go
+ 3,
+ // The timeout after which the reception of a message
is deemed a failure
+ Duration.ofSeconds(1),
+ // The number of times the reception of a message
should be retried in case of failure
+ 10)
+ .setProps(new Properties())
+ .build();
+
+DataStream<String> fromPubSub =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"pubsub-source");
+```
+
+## Internals
+
+#### Split Enumerator
+
+Pub/Sub doesn't expose any partitions to consuming applications. Therefore,
the implementation of the `PubSubSourceEnumerator` doesn't do any real work
discovery. Instead, a static `PubSubSplit` is handed to every
`PubSubSourceReader` which requests a source split. This static source split
doesn't contain split-specific information like partitions/offsets because this
information can not be obtained.
+
+#### Source Reader
+
+A `PubSubSourceReader` uses Pub/Sub's pull mechanism to read new messages from
the Pub/Sub subscription specified by the user. In the case of parallel-running
source readers in Flink, every source reader is passed the same source split
from the enumerator. Because of this, all source readers use the same
connection details and the same Pub/Sub subscription to receive messages. In
this case, Pub/Sub automatically load-balances messages between all source
readers which pull from the same subscription. This way, parallel processing is
achieved in the source.
+
+#### At-least-once guarantee
+
+Pub/Sub only guarantees at-least-once message delivery. This guarantee is kept
up by the source as well. The mechanism that is used to achieve this is that
Pub/Sub expects a message to be acknowledged by the subscriber to signal that
the message has been consumed successfully. Any message that has not been
acknowledged yet will be automatically redelivered by Pub/Sub once an ack
deadline has passed.
+
+When a new checkpoint is written...
+- all messages pulled since the previous checkpoint are acknowledged to
Pub/Sub and...
+- are forwarded to down-stream tasks
Review comment:
Sorry, didn't rework that part of the description enough, it stems from
when we had a worse understanding of the data flow and checkpointing 😅 So
actually in the PubSubSource, messages are constantly forwarded to downstream
tasks and not just after successful checkpointing. This can also be observed
when we set the checkpointing interval very high and messages reach downstream
tasks before the first checkpoint is triggered.
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ * // The deserialization schema to deserialize Pub/Sub messages
+ * .withDeserializationSchema(new SimpleStringSchema())
+ * // The name string of your Pub/Sub project
+ * .withProjectName(PROJECT_NAME)
+ * // The name string of the subscription you would like to receive
messages from
+ * .withSubscriptionName(SUBSCRIPTION_NAME)
+ * // An instance of the com.google.auth.Credentials class to
authenticate against Google Cloud
+ * .withCredentials(CREDENTIALS)
+ * .withPubSubSubscriberFactory(
+ * // The maximum number of messages that should be pulled in
one go
+ * 3,
+ * // The timeout after which the reception of a message is
deemed a failure
+ * Duration.ofSeconds(1),
+ * // The number of times the reception of a message should be
retried in case of failure
+ * 10)
+ * .setProps(new Properties())
+ * .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+ implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>,
ResultTypeQueryable<OUT> {
+ protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+ protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+ private final Properties props;
+ private final Credentials credentials;
+
+ PubSubSource(
+ PubSubDeserializationSchema<OUT> deserializationSchema,
+ PubSubSubscriberFactory pubSubSubscriberFactory,
+ Properties props,
+ Credentials credentials) {
+ this.deserializationSchema = deserializationSchema;
+ this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+ this.props = props;
+ this.credentials = credentials;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext
readerContext) {
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>>
elementsQueue =
+ new FutureCompletingBlockingQueue<>();
+ Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+ () -> {
+ try {
+ return new PubSubSplitReader<>(
+ deserializationSchema,
+
pubSubSubscriberFactory.getSubscriber(credentials));
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ };
+ PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+ return new PubSubSourceReader<>(
+ elementsQueue,
+ splitReaderSupplier,
+ recordEmitter,
+ toConfiguration(props),
+ readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint>
createEnumerator(
+ SplitEnumeratorContext<PubSubSplit> enumContext) {
+ return new PubSubSourceEnumerator(enumContext);
+ }
+
+ @Override
+ public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint>
restoreEnumerator(
+ SplitEnumeratorContext<PubSubSplit> enumContext,
+ PubSubEnumeratorCheckpoint checkpoint) {
+ return new PubSubSourceEnumerator(enumContext);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+ return new PubSubSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint>
+ getEnumeratorCheckpointSerializer() {
+ return new PubSubEnumeratorCheckpointSerializer();
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+ /**
+ * Get a builder to build a {@link PubSubSource}.
+ *
+ * @return A builder for a @{link PubSubSource}.
+ */
+ public static DeserializationSchemaBuilder newBuilder() {
+ return new DeserializationSchemaBuilder();
+ }
+
+ /** @param <OUT> */
+ public static class PubSubSourceBuilder<OUT>
+ implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
+ private final PubSubDeserializationSchema<OUT> deserializationSchema;
+ private String projectName;
+ private String subscriptionName;
+
+ private PubSubSubscriberFactory pubSubSubscriberFactory;
+ private Properties props;
+ private Credentials credentials;
+
+ /**
+ * Use any {@link DeserializationSchema} to use in the {@link
PubSubSource}. The schema will
+ * be wrapped automatically for compatibility with the source.
+ *
+ * @param deserializationSchema The deserialization schema to use.
+ */
+ private PubSubSourceBuilder(DeserializationSchema<OUT>
deserializationSchema) {
+ Preconditions.checkNotNull(deserializationSchema);
+ this.deserializationSchema = new
DeserializationSchemaWrapper<>(deserializationSchema);
+ }
+
+ /**
+ * Use a {@link PubSubDeserializationSchema} for the {@link
PubSubSource}.
+ *
+ * @param deserializationSchema The deserialization schema to use.
+ */
+ private PubSubSourceBuilder(PubSubDeserializationSchema<OUT>
deserializationSchema) {
+ Preconditions.checkNotNull(deserializationSchema);
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public SubscriptionNameBuilder<OUT> withProjectName(String
projectName) {
+ Preconditions.checkNotNull(projectName);
+ this.projectName = projectName;
+ return this;
+ }
+
+ @Override
+ public PubSubSourceBuilder<OUT> withSubscriptionName(String
subscriptionName) {
+ Preconditions.checkNotNull(subscriptionName);
+ this.subscriptionName = subscriptionName;
+ return this;
+ }
+
+ public PubSubSourceBuilder<OUT> withCredentials(Credentials
credentials) {
+ this.credentials = credentials;
+ return this;
+ }
+
+ public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+ PubSubSubscriberFactory pubSubSubscriberFactory) {
+ this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+ return this;
+ }
+
+ public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
+ int maxMessagesPerPull, Duration perRequestTimeout, int
retries) {
+ this.pubSubSubscriberFactory =
+ new DefaultPubSubSubscriberFactory(
+ ProjectSubscriptionName.format(projectName,
subscriptionName),
+ retries,
+ perRequestTimeout,
+ maxMessagesPerPull);
+ return this;
+ }
+
+ public PubSubSourceBuilder setProps(Properties props) {
+ this.props = props;
+ return this;
+ }
+
+ public PubSubSource<OUT> build() throws IOException {
+ if (credentials == null) {
+ credentials =
defaultCredentialsProviderBuilder().build().getCredentials();
+ }
+
+ if (pubSubSubscriberFactory == null) {
+ pubSubSubscriberFactory =
+ new DefaultPubSubSubscriberFactory(
+ ProjectSubscriptionName.format(projectName,
subscriptionName),
+ 3,
+ Duration.ofSeconds(15),
+ 100);
+ }
+
+ return new PubSubSource(
+ deserializationSchema, pubSubSubscriberFactory, props,
credentials);
+ }
+ }
+
+ /** Part of {@link PubSubSourceBuilder} to set required fields. */
+ public static class DeserializationSchemaBuilder {
+ /**
+ * Set the DeserializationSchema used to deserialize incoming
PubSubMessages. If you want
+ * access to meta data of a PubSubMessage use the overloaded
+ * withDeserializationSchema({@link PubSubDeserializationSchema})
method instead.
+ */
+ public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
+ DeserializationSchema<OUT> deserializationSchema) {
+ return new PubSubSourceBuilder<>(deserializationSchema);
+ }
+
+ /** Set the DeserializationSchema used to deserialize incoming
PubSubMessages. */
+ public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
+ PubSubDeserializationSchema<OUT> deserializationSchema) {
+ return new PubSubSourceBuilder<>(deserializationSchema);
+ }
+ }
+
+ /** Part of {@link PubSubSourceBuilder} to set required fields. */
+ public interface ProjectNameBuilder<OUT> {
+ /** Set the project name of the subscription to pull messages from. */
+ SubscriptionNameBuilder<OUT> withProjectName(String projectName);
+ }
+
+ /** Part of {@link PubSubSourceBuilder} to set required fields. */
+ public interface SubscriptionNameBuilder<OUT> {
+ /** Set the subscription name of the subscription to pull messages
from. */
+ PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
+ }
Review comment:
I refactored the builder now a bit. You are right, there was no point in
the interfaces.
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub
source:
+
+```xml
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
+ <version>1.13-SNAPSHOT</version>
+</dependency>
+```
+
+## Usage
+
+Please keep in mind that the new source can be found in package
`org.apache.flink.streaming.connectors.gcp.pubsub.source` while the old source
implementation can still be found in
`org.apache.flink.streaming.connectors.gcp.pubsub` (without `source` suffix).
The API of the new source is not much different from the old one.
+
+To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are
acknowledged against Pub/Sub when checkpointing succeeds. If a message is not
acknowledged within a timeout (here `Duration.ofSeconds(1)`), Pub/Sub will
attempt redelivery. To avoid unnecessary redelivery of successfully received
messages, the timeout after which the reception of a message is deemed a
failure (acknowledge deadline) should always be configured (much) *higher* than
the checkpointing interval!
Review comment:
What do you mean exactly? If Pub/Sub takes a bit longer to receive the
acknowledge request, it could be that the next checkpoint gets already
triggered before previous messages are acknowledged. But that shouldn't have an
effect on consistency guarantees. If the acknowledgement to Pub/Sub fails or
Pub/Sub registers that the ack deadline has been surpassed, there would be
redelivery of messages but the at-least-once guarantee would still hold.
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpoint;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorCheckpointSerializer;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink. A
{@link PubSubSource} can
+ * be constructed through the {@link PubSubSourceBuilder} like so:
+ *
+ * <pre>{@code
+ * PubSubSource.newBuilder()
+ * // The deserialization schema to deserialize Pub/Sub messages
+ * .withDeserializationSchema(new SimpleStringSchema())
+ * // The name string of your Pub/Sub project
+ * .withProjectName(PROJECT_NAME)
+ * // The name string of the subscription you would like to receive
messages from
+ * .withSubscriptionName(SUBSCRIPTION_NAME)
+ * // An instance of the com.google.auth.Credentials class to
authenticate against Google Cloud
+ * .withCredentials(CREDENTIALS)
+ * .withPubSubSubscriberFactory(
+ * // The maximum number of messages that should be pulled in
one go
+ * 3,
+ * // The timeout after which the reception of a message is
deemed a failure
+ * Duration.ofSeconds(1),
+ * // The number of times the reception of a message should be
retried in case of failure
+ * 10)
+ * .setProps(new Properties())
+ * .build();
+ * }</pre>
+ *
+ * <p>More details can be found at {@link PubSubSourceBuilder}
+ *
+ * @param <OUT> The output type of the source.
+ */
+public class PubSubSource<OUT>
+ implements Source<OUT, PubSubSplit, PubSubEnumeratorCheckpoint>,
ResultTypeQueryable<OUT> {
+ protected final PubSubDeserializationSchema<OUT> deserializationSchema;
+ protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+ private final Properties props;
+ private final Credentials credentials;
+
+ PubSubSource(
+ PubSubDeserializationSchema<OUT> deserializationSchema,
+ PubSubSubscriberFactory pubSubSubscriberFactory,
+ Properties props,
+ Credentials credentials) {
+ this.deserializationSchema = deserializationSchema;
+ this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+ this.props = props;
+ this.credentials = credentials;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public SourceReader<OUT, PubSubSplit> createReader(SourceReaderContext
readerContext) {
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<OUT, Long>>>
elementsQueue =
+ new FutureCompletingBlockingQueue<>();
+ Supplier<PubSubSplitReader<OUT>> splitReaderSupplier =
+ () -> {
+ try {
+ return new PubSubSplitReader<>(
+ deserializationSchema,
+
pubSubSubscriberFactory.getSubscriber(credentials));
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ };
+ PubSubRecordEmitter<OUT> recordEmitter = new PubSubRecordEmitter<>();
+
+ return new PubSubSourceReader<>(
+ elementsQueue,
+ splitReaderSupplier,
+ recordEmitter,
+ toConfiguration(props),
+ readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint>
createEnumerator(
+ SplitEnumeratorContext<PubSubSplit> enumContext) {
+ return new PubSubSourceEnumerator(enumContext);
+ }
+
+ @Override
+ public SplitEnumerator<PubSubSplit, PubSubEnumeratorCheckpoint>
restoreEnumerator(
+ SplitEnumeratorContext<PubSubSplit> enumContext,
+ PubSubEnumeratorCheckpoint checkpoint) {
+ return new PubSubSourceEnumerator(enumContext);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PubSubSplit> getSplitSerializer() {
+ return new PubSubSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PubSubEnumeratorCheckpoint>
+ getEnumeratorCheckpointSerializer() {
+ return new PubSubEnumeratorCheckpointSerializer();
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+ /**
+ * Get a builder to build a {@link PubSubSource}.
+ *
+ * @return A builder for a @{link PubSubSource}.
+ */
+ public static DeserializationSchemaBuilder newBuilder() {
+ return new DeserializationSchemaBuilder();
+ }
+
+ /** @param <OUT> */
+ public static class PubSubSourceBuilder<OUT>
Review comment:
Refactored the builder now and as far as I can see it will have to stay
public.
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md
##########
@@ -0,0 +1,78 @@
+# Flink Source for Google Cloud Pub/Sub
+
+This is a source implementation for receiving Google Cloud Pub/Sub messages in
Flink with an at-least-once guarantee.
+
+## Installation
+
+Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub
source:
+
+```xml
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
+ <version>1.13-SNAPSHOT</version>
+</dependency>
+```
+
+## Usage
+
+Please keep in mind that the new source can be found in package
`org.apache.flink.streaming.connectors.gcp.pubsub.source` while the old source
implementation can still be found in
`org.apache.flink.streaming.connectors.gcp.pubsub` (without `source` suffix).
The API of the new source is not much different from the old one.
Review comment:
Agree, removed it
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit;
+import org.apache.flink.util.Collector;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link SplitReader} to read from a given {@link PubSubSubscriber}.
+ *
+ * @param <T> the type of the record.
+ */
+public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>,
PubSubSplit> {
+ private static final Logger LOG =
LoggerFactory.getLogger(PubSubSplitReader.class);
+ private final PubSubSubscriber subscriber;
+ private final PubSubDeserializationSchema<T> deserializationSchema;
+ private final PubSubCollector collector;
+ // Store the IDs of GCP Pub/Sub messages that yet have to be acknowledged
so that they are not
+ // resent.
+ private final List<String> messageIdsToAcknowledge = new ArrayList<>();
Review comment:
Good catch, thank you. I discovered that there was even a bit of a
bigger problem: Because the list stores IDs of messages received "under
multiple checkpoints" and because of concurrency, it was possible to
acknowledge messages that haven't been checkpointed yet. However, in the new
solution there's still some race condition that leads to message loss in the
case of failure and recovery from checkpoint.
##########
File path:
flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
+import
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
+import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
+import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import static
org.apache.flink.streaming.connectors.gcp.pubsub.SimpleStringSchemaWithStopMarkerDetection.STOP_MARKER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */
+public class EmulatedPubSubNewSourceTest extends GCloudUnitTestBase {
Review comment:
Added a test now but the results don't look good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]