This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2494cec219ad48f64993a04bec5d20464c11c94
Author: Jiangjie (Becket) Qin <becket....@gmail.com>
AuthorDate: Fri Jul 5 01:05:41 2019 +0800

    [FLINK-9311] [pubsub] Improvements to builders + minor improvement to 
PubSubSink flush logic
---
 docs/dev/connectors/pubsub.md                      |  8 +--
 .../connectors/gcp/pubsub/PubSubSink.java          | 60 ++++++++++++++--------
 .../connectors/gcp/pubsub/PubSubSource.java        | 29 +++++------
 .../gcp/pubsub/EmulatedPubSubSinkTest.java         |  4 +-
 .../gcp/pubsub/EmulatedPubSubSourceTest.java       |  2 +-
 .../examples/gcp/pubsub/PubSubExample.java         |  4 +-
 6 files changed, 61 insertions(+), 46 deletions(-)

diff --git a/docs/dev/connectors/pubsub.md b/docs/dev/connectors/pubsub.md
index bfa6f4a..7c14cd5 100644
--- a/docs/dev/connectors/pubsub.md
+++ b/docs/dev/connectors/pubsub.md
@@ -62,7 +62,7 @@ Example:
 StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
 DeserializationSchema<SomeObject> deserializer = (...);
-SourceFunction<SomeObject> pubsubSource = 
PubSubSource.newBuilder(SomeObject.class)
+SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
                                                       
.withDeserializationSchema(deserializer)
                                                       
.withProjectName("project")
                                                       
.withSubscriptionName("subscription")
@@ -89,7 +89,7 @@ Example:
 DataStream<SomeObject> dataStream = (...);
 
 SerializationSchema<SomeObject> serializationSchema = (...);
-SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder(SomeObject.class)
+SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
                                                 
.withDeserializationSchema(deserializer)
                                                 .withProjectName("project")
                                                 
.withSubscriptionName("subscription")
@@ -117,13 +117,13 @@ The following example shows how you would create a source 
to read messages from
 <div data-lang="java" markdown="1">
 {% highlight java %}
 DeserializationSchema<SomeObject> deserializationSchema = (...);
-SourceFunction<SomeObject> pubsubSource = 
PubSubSource.newBuilder(SomeObject.class)
+SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
                                                       
.withDeserializationSchema(deserializationSchema)
                                                       
.withProjectName("my-fake-project")
                                                       
.withSubscriptionName("subscription")
                                                       
.withPubSubSubscriberFactory(new 
PubSubSubscriberFactoryForEmulator("localhost:1234", "my-fake-project", 
"subscription", 10, Duration.ofSeconds(15), 100))
                                                       .build();
-SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder(SomeObject.class)
+SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
                                                 
.withDeserializationSchema(deserializationSchema)
                                                 
.withProjectName("my-fake-project")
                                                 
.withSubscriptionName("subscription")
diff --git 
a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
 
b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
index d280dc9..f314d3e 100644
--- 
a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
+++ 
b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
@@ -46,8 +46,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
@@ -63,7 +63,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
 
        private final AtomicReference<Exception> exceptionAtomicReference;
        private final ApiFutureCallback<String> failureHandler;
-       private final ConcurrentLinkedQueue<ApiFuture<String>> 
outstandingFutures;
+       private final AtomicInteger numPendingFutures;
        private final Credentials credentials;
        private final SerializationSchema<IN> serializationSchema;
        private final String projectName;
@@ -81,7 +81,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
                String hostAndPortForEmulator) {
                this.exceptionAtomicReference = new AtomicReference<>();
                this.failureHandler = new FailureHandler();
-               this.outstandingFutures = new ConcurrentLinkedQueue<>();
+               this.numPendingFutures = new AtomicInteger(0);
                this.credentials = credentials;
                this.serializationSchema = serializationSchema;
                this.projectName = projectName;
@@ -162,18 +162,17 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
                        .build();
 
                ApiFuture<String> future = publisher.publish(pubsubMessage);
-               outstandingFutures.add(future);
+               numPendingFutures.incrementAndGet();
                ApiFutures.addCallback(future, failureHandler, 
directExecutor());
        }
 
        /**
         * Create a builder for a new PubSubSink.
         *
-        * @param <IN> The generic of the type that is to be written into the 
sink.
         * @return a new PubSubSinkBuilder instance
         */
-       public static <IN> SerializationSchemaBuilder<IN> newBuilder(Class<IN> 
clazz) {
-               return new PubSubSinkBuilder<>();
+       public static SerializationSchemaBuilder newBuilder() {
+               return new SerializationSchemaBuilder();
        }
 
        @Override
@@ -181,6 +180,8 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
                //before checkpoints make sure all the batched / buffered 
pubsub messages have actually been sent
                publisher.publishAllOutstanding();
 
+               // At this point, no new messages will be published because 
this thread has successfully acquired
+               // the checkpoint lock. So we just wait for all the pending 
futures to complete.
                waitForFuturesToComplete();
                if (exceptionAtomicReference.get() != null) {
                        throw exceptionAtomicReference.get();
@@ -188,8 +189,17 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
        }
 
        private void waitForFuturesToComplete() {
-               while (isRunning && !outstandingFutures.isEmpty()) {
-                       outstandingFutures.removeIf(ApiFuture::isDone);
+               // We have to synchronize on numPendingFutures here to ensure 
the notification won't be missed.
+               synchronized (numPendingFutures) {
+                       while (isRunning && numPendingFutures.get() > 0) {
+                               try {
+                                       numPendingFutures.wait();
+                               } catch (InterruptedException e) {
+                                       // Simply cache the interrupted 
exception. Supposedly the thread will exit the loop
+                                       // gracefully when it checks the 
isRunning flag.
+                                       LOG.info("Interrupted when waiting for 
futures to complete");
+                               }
+                       }
                }
        }
 
@@ -202,7 +212,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
         *
         * @param <IN> Type of PubSubSink to create.
         */
-       public static class PubSubSinkBuilder<IN> implements 
SerializationSchemaBuilder<IN>, ProjectNameBuilder<IN>, TopicNameBuilder<IN> {
+       public static class PubSubSinkBuilder<IN> implements 
ProjectNameBuilder<IN>, TopicNameBuilder<IN> {
                private SerializationSchema<IN> serializationSchema;
                private String projectName;
                private String topicName;
@@ -210,7 +220,9 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
                private Credentials credentials;
                private String hostAndPort;
 
-               private PubSubSinkBuilder() { }
+               private PubSubSinkBuilder(SerializationSchema<IN> 
serializationSchema) {
+                       this.serializationSchema = serializationSchema;
+               }
 
                /**
                 * Set the credentials.
@@ -225,13 +237,6 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
                }
 
                @Override
-               public ProjectNameBuilder<IN> 
withSerializationSchema(SerializationSchema<IN> serializationSchema) {
-                       Preconditions.checkNotNull(serializationSchema);
-                       this.serializationSchema = serializationSchema;
-                       return this;
-               }
-
-               @Override
                public TopicNameBuilder<IN> withProjectName(String projectName) 
{
                        Preconditions.checkNotNull(projectName);
                        this.projectName = projectName;
@@ -275,11 +280,13 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
        /**
         * Part of {@link PubSubSinkBuilder} to set required fields.
         */
-       public interface SerializationSchemaBuilder<IN> {
+       public static class SerializationSchemaBuilder {
                /**
                 * Set the SerializationSchema used to Serialize objects to be 
added as payloads of PubSubMessages.
                 */
-               ProjectNameBuilder<IN> 
withSerializationSchema(SerializationSchema<IN> deserializationSchema);
+               public <IN> ProjectNameBuilder<IN> 
withSerializationSchema(SerializationSchema<IN> deserializationSchema) {
+                       return new PubSubSinkBuilder<>(deserializationSchema);
+               }
        }
 
        /**
@@ -305,13 +312,24 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> 
implements Checkpointed
        private class FailureHandler implements ApiFutureCallback<String>, 
Serializable {
                @Override
                public void onFailure(Throwable t) {
+                       ackAndMaybeNotifyNoPendingFutures();
                        exceptionAtomicReference.set(new 
RuntimeException("Failed trying to publish message", t));
                }
 
                @Override
                public void onSuccess(String result) {
-                       //do nothing on success
+                       ackAndMaybeNotifyNoPendingFutures();
                        LOG.debug("Successfully published message with id: {}", 
result);
                }
+
+               private void ackAndMaybeNotifyNoPendingFutures() {
+                       // When there are no pending futures anymore, notify 
the thread that is waiting for
+                       // all the pending futures to be completed.
+                       if (numPendingFutures.decrementAndGet() == 0) {
+                               synchronized (numPendingFutures) {
+                                       numPendingFutures.notify();
+                               }
+                       }
+               }
        }
 }
diff --git 
a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 
b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
index d093e0a..4ddd816 100644
--- 
a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
+++ 
b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
@@ -155,8 +155,8 @@ public class PubSubSource<OUT> extends 
RichSourceFunction<OUT>
                return deserializationSchema.getProducedType();
        }
 
-       public static <OUT> DeserializationSchemaBuilder<OUT> 
newBuilder(Class<OUT> clazz) {
-               return new PubSubSourceBuilder<>();
+       public static DeserializationSchemaBuilder newBuilder() {
+               return new DeserializationSchemaBuilder();
        }
 
        @Override
@@ -189,7 +189,7 @@ public class PubSubSource<OUT> extends 
RichSourceFunction<OUT>
         *
         * @param <OUT> The type of objects which will be read
         */
-       public static class PubSubSourceBuilder<OUT> implements 
DeserializationSchemaBuilder<OUT>, ProjectNameBuilder<OUT>, 
SubscriptionNameBuilder<OUT> {
+       public static class PubSubSourceBuilder<OUT> implements 
ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
                private PubSubDeserializationSchema<OUT> deserializationSchema;
                private String projectName;
                private String subscriptionName;
@@ -198,25 +198,18 @@ public class PubSubSource<OUT> extends 
RichSourceFunction<OUT>
                private Credentials credentials;
                private int maxMessageToAcknowledge = 10000;
 
-               protected PubSubSourceBuilder() {
-               }
-
-               @Override
-               public ProjectNameBuilder 
withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+               private PubSubSourceBuilder(DeserializationSchema<OUT> 
deserializationSchema) {
                        Preconditions.checkNotNull(deserializationSchema);
                        this.deserializationSchema = new 
DeserializationSchemaWrapper<>(deserializationSchema);
-                       return this;
                }
 
-               @Override
-               public ProjectNameBuilder 
withDeserializationSchema(PubSubDeserializationSchema deserializationSchema) {
+               private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> 
deserializationSchema) {
                        Preconditions.checkNotNull(deserializationSchema);
                        this.deserializationSchema = deserializationSchema;
-                       return this;
                }
 
                @Override
-               public SubscriptionNameBuilder withProjectName(String 
projectName) {
+               public SubscriptionNameBuilder<OUT> withProjectName(String 
projectName) {
                        Preconditions.checkNotNull(projectName);
                        this.projectName = projectName;
                        return this;
@@ -306,17 +299,21 @@ public class PubSubSource<OUT> extends 
RichSourceFunction<OUT>
        /**
         * Part of {@link PubSubSourceBuilder} to set required fields.
         */
-       public interface DeserializationSchemaBuilder<OUT> {
+       public static class DeserializationSchemaBuilder {
                /**
                 * Set the DeserializationSchema used to deserialize incoming 
PubSubMessages.
                 * If you want access to meta data of a PubSubMessage use the 
overloaded withDeserializationSchema({@link PubSubDeserializationSchema}) 
method instead.
                 */
-               ProjectNameBuilder<OUT> 
withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema);
+               public <OUT> ProjectNameBuilder<OUT> 
withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+                       return new PubSubSourceBuilder<>(deserializationSchema);
+               }
 
                /**
                 * Set the DeserializationSchema used to deserialize incoming 
PubSubMessages.
                 */
-               ProjectNameBuilder<OUT> 
withDeserializationSchema(PubSubDeserializationSchema<OUT> 
deserializationSchema);
+               public <OUT> ProjectNameBuilder<OUT> 
withDeserializationSchema(PubSubDeserializationSchema<OUT> 
deserializationSchema) {
+                       return new PubSubSourceBuilder<>(deserializationSchema);
+               }
        }
 
        /**
diff --git 
a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
 
b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
index 0566e01..5c0c3b1 100644
--- 
a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
+++ 
b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
@@ -77,7 +77,7 @@ public class EmulatedPubSubSinkTest extends 
GCloudUnitTestBase {
 
                // Sink into pubsub
                theData
-                       .addSink(PubSubSink.newBuilder(String.class)
+                       .addSink(PubSubSink.newBuilder()
                                                                
.withSerializationSchema(new SimpleStringSchema())
                                                                
.withProjectName(PROJECT_NAME)
                                                                
.withTopicName(TOPIC_NAME)
@@ -116,7 +116,7 @@ public class EmulatedPubSubSinkTest extends 
GCloudUnitTestBase {
                env.addSource(new SingleInputSourceFunction())
 
                        .map((MapFunction<String, String>) StringUtils::reverse)
-                       .addSink(PubSubSink.newBuilder(String.class)
+                       .addSink(PubSubSink.newBuilder()
                                                                
.withSerializationSchema(new SimpleStringSchema())
                                                                
.withProjectName(PROJECT_NAME)
                                                                
.withTopicName(TOPIC_NAME)
diff --git 
a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
 
b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
index f3f9b0a..b28569c 100644
--- 
a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
+++ 
b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
@@ -96,7 +96,7 @@ public class EmulatedPubSubSourceTest extends 
GCloudUnitTestBase {
                env.setRestartStrategy(RestartStrategies.noRestart());
 
                DataStream<String> fromPubSub = env
-                       .addSource(PubSubSource.newBuilder(String.class)
+                       .addSource(PubSubSource.newBuilder()
                                                                
.withDeserializationSchema(new BoundedStringDeserializer(10))
                                                                
.withProjectName(PROJECT_NAME)
                                                                
.withSubscriptionName(SUBSCRIPTION_NAME)
diff --git 
a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
 
b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
index 7b66577..a960176 100644
--- 
a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
+++ 
b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
@@ -63,13 +63,13 @@ public class PubSubExample {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.enableCheckpointing(1000L);
 
-               env.addSource(PubSubSource.newBuilder(Integer.class)
+               env.addSource(PubSubSource.newBuilder()
                                                                
.withDeserializationSchema(new IntegerSerializer())
                                                                
.withProjectName(projectName)
                                                                
.withSubscriptionName(subscriptionName)
                                                                .build())
                        .map(PubSubExample::printAndReturn).disableChaining()
-                       .addSink(PubSubSink.newBuilder(Integer.class)
+                       .addSink(PubSubSink.newBuilder()
                                                                
.withSerializationSchema(new IntegerSerializer())
                                                                
.withProjectName(projectName)
                                                                
.withTopicName(outputTopicName).build());

Reply via email to