mosche commented on a change in pull request #15955:
URL: https://github.com/apache/beam/pull/15955#discussion_r757583625
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -917,30 +971,6 @@ public void startBundle() {
putFutures = Collections.synchronizedList(new ArrayList<>());
/** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
- initKinesisProducer();
- }
-
- private synchronized void initKinesisProducer() {
Review comment:
Yet another issue is synchronisation when creating the producer
instance. If the producer is statically shared among writers of the same JVM,
synchronisation cannot be done on `this`.
Otherwise there might be multiple producer deamons running, with only the
latest one being used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]