nielsbasjes commented on a change in pull request #12846:
URL: https://github.com/apache/flink/pull/12846#discussion_r457948500
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
##########
@@ -97,17 +100,31 @@ public void open(Configuration configuration) throws
Exception {
serializationSchema.open(() ->
getRuntimeContext().getMetricGroup().addGroup("user"));
Publisher.Builder builder = Publisher
- .newBuilder(ProjectTopicName.of(projectName, topicName))
+ .newBuilder(TopicName.of(projectName, topicName))
.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+ // Having the host and port for the emulator means we are in a
testing scenario.
if (hostAndPortForEmulator != null) {
managedChannel = ManagedChannelBuilder
.forTarget(hostAndPortForEmulator)
- .usePlaintext(true) // This is 'Ok' because
this is ONLY used for testing.
+ .usePlaintext() // This is 'Ok' because this is
ONLY used for testing.
.build();
channel =
GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
-
.setCredentialsProvider(NoCredentialsProvider.create());
+
.setCredentialsProvider(EmulatorCredentialsProvider.create())
+ // In test scenarios we are limiting
the Retry settings.
+ // The values here are based on the
default settings with lower attempts and timeouts.
+ .setRetrySettings(
+ RetrySettings.newBuilder()
+ .setMaxAttempts(10)
+
.setTotalTimeout(Duration.ofSeconds(10))
+
.setInitialRetryDelay(Duration.ofMillis(100))
+
.setRetryDelayMultiplier(1.3)
+
.setMaxRetryDelay(Duration.ofSeconds(5))
+
.setInitialRpcTimeout(Duration.ofSeconds(5))
+
.setRpcTimeoutMultiplier(1)
+
.setMaxRpcTimeout(Duration.ofSeconds(10))
+ .build());
Review comment:
I chose to deviate from the existing code as little as possible.
My primary focus is to update to the latest Google dependencies and to make
sure it all really works.
So these changes in the retry settings were put in a codepath that is only
touched when using the emulator and thus testing scenario.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]