rmetzger commented on a change in pull request #12846:
URL: https://github.com/apache/flink/pull/12846#discussion_r458597136
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
##########
@@ -97,17 +100,31 @@ public void open(Configuration configuration) throws
Exception {
serializationSchema.open(() ->
getRuntimeContext().getMetricGroup().addGroup("user"));
Publisher.Builder builder = Publisher
- .newBuilder(ProjectTopicName.of(projectName, topicName))
+ .newBuilder(TopicName.of(projectName, topicName))
.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+ // Having the host and port for the emulator means we are in a
testing scenario.
if (hostAndPortForEmulator != null) {
managedChannel = ManagedChannelBuilder
.forTarget(hostAndPortForEmulator)
- .usePlaintext(true) // This is 'Ok' because
this is ONLY used for testing.
+ .usePlaintext() // This is 'Ok' because this is
ONLY used for testing.
.build();
channel =
GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
-
.setCredentialsProvider(NoCredentialsProvider.create());
+
.setCredentialsProvider(EmulatorCredentialsProvider.create())
+ // In test scenarios we are limiting
the Retry settings.
+ // The values here are based on the
default settings with lower attempts and timeouts.
+ .setRetrySettings(
+ RetrySettings.newBuilder()
+ .setMaxAttempts(10)
+
.setTotalTimeout(Duration.ofSeconds(10))
+
.setInitialRetryDelay(Duration.ofMillis(100))
+
.setRetryDelayMultiplier(1.3)
+
.setMaxRetryDelay(Duration.ofSeconds(5))
+
.setInitialRpcTimeout(Duration.ofSeconds(5))
+
.setRpcTimeoutMultiplier(1)
+
.setMaxRpcTimeout(Duration.ofSeconds(10))
+ .build());
Review comment:
You are right, it would be an unrelated refactoring to address this
issue (I personally try to reduce code smells like this when I have the
opportunity to do it, but I understand that keeping the scope of a change small
and self-contained is also valuable).
I've filed a JIRA ticket to address this in the future:
https://issues.apache.org/jira/browse/FLINK-18669
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]