rmetzger commented on a change in pull request #12846:
URL: https://github.com/apache/flink/pull/12846#discussion_r457380452



##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
##########
@@ -97,17 +100,31 @@ public void open(Configuration configuration) throws 
Exception {
                serializationSchema.open(() -> 
getRuntimeContext().getMetricGroup().addGroup("user"));
 
                Publisher.Builder builder = Publisher
-                       .newBuilder(ProjectTopicName.of(projectName, topicName))
+                       .newBuilder(TopicName.of(projectName, topicName))
                        
.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
 
+               // Having the host and port for the emulator means we are in a 
testing scenario.
                if (hostAndPortForEmulator != null) {
                        managedChannel = ManagedChannelBuilder
                                .forTarget(hostAndPortForEmulator)
-                               .usePlaintext(true) // This is 'Ok' because 
this is ONLY used for testing.
+                               .usePlaintext() // This is 'Ok' because this is 
ONLY used for testing.
                                .build();
                        channel = 
GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
                        
builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
-                                       
.setCredentialsProvider(NoCredentialsProvider.create());
+                                       
.setCredentialsProvider(EmulatorCredentialsProvider.create())
+                                       // In test scenarios we are limiting 
the Retry settings.
+                                       // The values here are based on the 
default settings with lower attempts and timeouts.
+                                       .setRetrySettings(
+                                               RetrySettings.newBuilder()
+                                                       .setMaxAttempts(10)
+                                                       
.setTotalTimeout(Duration.ofSeconds(10))
+                                                       
.setInitialRetryDelay(Duration.ofMillis(100))
+                                                       
.setRetryDelayMultiplier(1.3)
+                                                       
.setMaxRetryDelay(Duration.ofSeconds(5))
+                                                       
.setInitialRpcTimeout(Duration.ofSeconds(5))
+                                                       
.setRpcTimeoutMultiplier(1)
+                                                       
.setMaxRpcTimeout(Duration.ofSeconds(10))
+                                                       .build());

Review comment:
       It seems a bit problematic to have testing code in the main sink code: 
Wouldn't it be generally a good idea to allow the connector's users to 
configure the retry (and possibly all settings) themselves?
   In my experience with the connectors, as more and more people start using 
them, they'll demand for access to all configuration options.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to