This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2494cec219ad48f64993a04bec5d20464c11c94 Author: Jiangjie (Becket) Qin <becket....@gmail.com> AuthorDate: Fri Jul 5 01:05:41 2019 +0800 [FLINK-9311] [pubsub] Improvements to builders + minor improvement to PubSubSink flush logic --- docs/dev/connectors/pubsub.md | 8 +-- .../connectors/gcp/pubsub/PubSubSink.java | 60 ++++++++++++++-------- .../connectors/gcp/pubsub/PubSubSource.java | 29 +++++------ .../gcp/pubsub/EmulatedPubSubSinkTest.java | 4 +- .../gcp/pubsub/EmulatedPubSubSourceTest.java | 2 +- .../examples/gcp/pubsub/PubSubExample.java | 4 +- 6 files changed, 61 insertions(+), 46 deletions(-) diff --git a/docs/dev/connectors/pubsub.md b/docs/dev/connectors/pubsub.md index bfa6f4a..7c14cd5 100644 --- a/docs/dev/connectors/pubsub.md +++ b/docs/dev/connectors/pubsub.md @@ -62,7 +62,7 @@ Example: StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); DeserializationSchema<SomeObject> deserializer = (...); -SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder(SomeObject.class) +SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder() .withDeserializationSchema(deserializer) .withProjectName("project") .withSubscriptionName("subscription") @@ -89,7 +89,7 @@ Example: DataStream<SomeObject> dataStream = (...); SerializationSchema<SomeObject> serializationSchema = (...); -SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder(SomeObject.class) +SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder() .withDeserializationSchema(deserializer) .withProjectName("project") .withSubscriptionName("subscription") @@ -117,13 +117,13 @@ The following example shows how you would create a source to read messages from <div data-lang="java" markdown="1"> {% highlight java %} DeserializationSchema<SomeObject> deserializationSchema = (...); -SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder(SomeObject.class) +SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder() .withDeserializationSchema(deserializationSchema) .withProjectName("my-fake-project") .withSubscriptionName("subscription") .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator("localhost:1234", "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100)) .build(); -SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder(SomeObject.class) +SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder() .withDeserializationSchema(deserializationSchema) .withProjectName("my-fake-project") .withSubscriptionName("subscription") diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java index d280dc9..f314d3e 100644 --- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java +++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java @@ -46,8 +46,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; @@ -63,7 +63,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed private final AtomicReference<Exception> exceptionAtomicReference; private final ApiFutureCallback<String> failureHandler; - private final ConcurrentLinkedQueue<ApiFuture<String>> outstandingFutures; + private final AtomicInteger numPendingFutures; private final Credentials credentials; private final SerializationSchema<IN> serializationSchema; private final String projectName; @@ -81,7 +81,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed String hostAndPortForEmulator) { this.exceptionAtomicReference = new AtomicReference<>(); this.failureHandler = new FailureHandler(); - this.outstandingFutures = new ConcurrentLinkedQueue<>(); + this.numPendingFutures = new AtomicInteger(0); this.credentials = credentials; this.serializationSchema = serializationSchema; this.projectName = projectName; @@ -162,18 +162,17 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed .build(); ApiFuture<String> future = publisher.publish(pubsubMessage); - outstandingFutures.add(future); + numPendingFutures.incrementAndGet(); ApiFutures.addCallback(future, failureHandler, directExecutor()); } /** * Create a builder for a new PubSubSink. * - * @param <IN> The generic of the type that is to be written into the sink. * @return a new PubSubSinkBuilder instance */ - public static <IN> SerializationSchemaBuilder<IN> newBuilder(Class<IN> clazz) { - return new PubSubSinkBuilder<>(); + public static SerializationSchemaBuilder newBuilder() { + return new SerializationSchemaBuilder(); } @Override @@ -181,6 +180,8 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed //before checkpoints make sure all the batched / buffered pubsub messages have actually been sent publisher.publishAllOutstanding(); + // At this point, no new messages will be published because this thread has successfully acquired + // the checkpoint lock. So we just wait for all the pending futures to complete. waitForFuturesToComplete(); if (exceptionAtomicReference.get() != null) { throw exceptionAtomicReference.get(); @@ -188,8 +189,17 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed } private void waitForFuturesToComplete() { - while (isRunning && !outstandingFutures.isEmpty()) { - outstandingFutures.removeIf(ApiFuture::isDone); + // We have to synchronize on numPendingFutures here to ensure the notification won't be missed. + synchronized (numPendingFutures) { + while (isRunning && numPendingFutures.get() > 0) { + try { + numPendingFutures.wait(); + } catch (InterruptedException e) { + // Simply cache the interrupted exception. Supposedly the thread will exit the loop + // gracefully when it checks the isRunning flag. + LOG.info("Interrupted when waiting for futures to complete"); + } + } } } @@ -202,7 +212,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed * * @param <IN> Type of PubSubSink to create. */ - public static class PubSubSinkBuilder<IN> implements SerializationSchemaBuilder<IN>, ProjectNameBuilder<IN>, TopicNameBuilder<IN> { + public static class PubSubSinkBuilder<IN> implements ProjectNameBuilder<IN>, TopicNameBuilder<IN> { private SerializationSchema<IN> serializationSchema; private String projectName; private String topicName; @@ -210,7 +220,9 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed private Credentials credentials; private String hostAndPort; - private PubSubSinkBuilder() { } + private PubSubSinkBuilder(SerializationSchema<IN> serializationSchema) { + this.serializationSchema = serializationSchema; + } /** * Set the credentials. @@ -225,13 +237,6 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed } @Override - public ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema) { - Preconditions.checkNotNull(serializationSchema); - this.serializationSchema = serializationSchema; - return this; - } - - @Override public TopicNameBuilder<IN> withProjectName(String projectName) { Preconditions.checkNotNull(projectName); this.projectName = projectName; @@ -275,11 +280,13 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed /** * Part of {@link PubSubSinkBuilder} to set required fields. */ - public interface SerializationSchemaBuilder<IN> { + public static class SerializationSchemaBuilder { /** * Set the SerializationSchema used to Serialize objects to be added as payloads of PubSubMessages. */ - ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> deserializationSchema); + public <IN> ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> deserializationSchema) { + return new PubSubSinkBuilder<>(deserializationSchema); + } } /** @@ -305,13 +312,24 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed private class FailureHandler implements ApiFutureCallback<String>, Serializable { @Override public void onFailure(Throwable t) { + ackAndMaybeNotifyNoPendingFutures(); exceptionAtomicReference.set(new RuntimeException("Failed trying to publish message", t)); } @Override public void onSuccess(String result) { - //do nothing on success + ackAndMaybeNotifyNoPendingFutures(); LOG.debug("Successfully published message with id: {}", result); } + + private void ackAndMaybeNotifyNoPendingFutures() { + // When there are no pending futures anymore, notify the thread that is waiting for + // all the pending futures to be completed. + if (numPendingFutures.decrementAndGet() == 0) { + synchronized (numPendingFutures) { + numPendingFutures.notify(); + } + } + } } } diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java index d093e0a..4ddd816 100644 --- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java +++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java @@ -155,8 +155,8 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT> return deserializationSchema.getProducedType(); } - public static <OUT> DeserializationSchemaBuilder<OUT> newBuilder(Class<OUT> clazz) { - return new PubSubSourceBuilder<>(); + public static DeserializationSchemaBuilder newBuilder() { + return new DeserializationSchemaBuilder(); } @Override @@ -189,7 +189,7 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT> * * @param <OUT> The type of objects which will be read */ - public static class PubSubSourceBuilder<OUT> implements DeserializationSchemaBuilder<OUT>, ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> { + public static class PubSubSourceBuilder<OUT> implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> { private PubSubDeserializationSchema<OUT> deserializationSchema; private String projectName; private String subscriptionName; @@ -198,25 +198,18 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT> private Credentials credentials; private int maxMessageToAcknowledge = 10000; - protected PubSubSourceBuilder() { - } - - @Override - public ProjectNameBuilder withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) { + private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) { Preconditions.checkNotNull(deserializationSchema); this.deserializationSchema = new DeserializationSchemaWrapper<>(deserializationSchema); - return this; } - @Override - public ProjectNameBuilder withDeserializationSchema(PubSubDeserializationSchema deserializationSchema) { + private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> deserializationSchema) { Preconditions.checkNotNull(deserializationSchema); this.deserializationSchema = deserializationSchema; - return this; } @Override - public SubscriptionNameBuilder withProjectName(String projectName) { + public SubscriptionNameBuilder<OUT> withProjectName(String projectName) { Preconditions.checkNotNull(projectName); this.projectName = projectName; return this; @@ -306,17 +299,21 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT> /** * Part of {@link PubSubSourceBuilder} to set required fields. */ - public interface DeserializationSchemaBuilder<OUT> { + public static class DeserializationSchemaBuilder { /** * Set the DeserializationSchema used to deserialize incoming PubSubMessages. * If you want access to meta data of a PubSubMessage use the overloaded withDeserializationSchema({@link PubSubDeserializationSchema}) method instead. */ - ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema); + public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) { + return new PubSubSourceBuilder<>(deserializationSchema); + } /** * Set the DeserializationSchema used to deserialize incoming PubSubMessages. */ - ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema); + public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema) { + return new PubSubSourceBuilder<>(deserializationSchema); + } } /** diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java index 0566e01..5c0c3b1 100644 --- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java +++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java @@ -77,7 +77,7 @@ public class EmulatedPubSubSinkTest extends GCloudUnitTestBase { // Sink into pubsub theData - .addSink(PubSubSink.newBuilder(String.class) + .addSink(PubSubSink.newBuilder() .withSerializationSchema(new SimpleStringSchema()) .withProjectName(PROJECT_NAME) .withTopicName(TOPIC_NAME) @@ -116,7 +116,7 @@ public class EmulatedPubSubSinkTest extends GCloudUnitTestBase { env.addSource(new SingleInputSourceFunction()) .map((MapFunction<String, String>) StringUtils::reverse) - .addSink(PubSubSink.newBuilder(String.class) + .addSink(PubSubSink.newBuilder() .withSerializationSchema(new SimpleStringSchema()) .withProjectName(PROJECT_NAME) .withTopicName(TOPIC_NAME) diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java index f3f9b0a..b28569c 100644 --- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java +++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java @@ -96,7 +96,7 @@ public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { env.setRestartStrategy(RestartStrategies.noRestart()); DataStream<String> fromPubSub = env - .addSource(PubSubSource.newBuilder(String.class) + .addSource(PubSubSource.newBuilder() .withDeserializationSchema(new BoundedStringDeserializer(10)) .withProjectName(PROJECT_NAME) .withSubscriptionName(SUBSCRIPTION_NAME) diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java index 7b66577..a960176 100644 --- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java +++ b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java @@ -63,13 +63,13 @@ public class PubSubExample { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000L); - env.addSource(PubSubSource.newBuilder(Integer.class) + env.addSource(PubSubSource.newBuilder() .withDeserializationSchema(new IntegerSerializer()) .withProjectName(projectName) .withSubscriptionName(subscriptionName) .build()) .map(PubSubExample::printAndReturn).disableChaining() - .addSink(PubSubSink.newBuilder(Integer.class) + .addSink(PubSubSink.newBuilder() .withSerializationSchema(new IntegerSerializer()) .withProjectName(projectName) .withTopicName(outputTopicName).build());