echauchot commented on a change in pull request #15955:
URL: https://github.com/apache/beam/pull/15955#discussion_r758362415



##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -917,30 +971,6 @@ public void startBundle() {
         putFutures = Collections.synchronizedList(new ArrayList<>());
         /** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
         failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
-        initKinesisProducer();
-      }
-
-      private synchronized void initKinesisProducer() {
-        // Init producer config
-        Properties props = spec.getProducerProperties();
-        if (props == null) {
-          props = new Properties();
-        }
-        KinesisProducerConfiguration config = 
KinesisProducerConfiguration.fromProperties(props);
-        // Fix to avoid the following message "WARNING: Exception during 
updateCredentials" during
-        // producer.destroy() call. More details can be found in this thread:
-        // https://github.com/awslabs/amazon-kinesis-producer/issues/10
-        config.setCredentialsRefreshDelay(100);

Review comment:
       removing this works because, the config is now set only when producer is 
null ?

##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
           getPartitionKey() == null || (getPartitioner() == null),
           "only one of either withPartitionKey() or withPartitioner() is 
possible");
       checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() 
is required");
+      producerConfiguration(); // verify Kinesis producer configuration can be 
build
 
       input.apply(ParDo.of(new KinesisWriterFn(this)));
       return PDone.in(input.getPipeline());
     }
 
-    private static class KinesisWriterFn extends DoFn<byte[], Void> {
+    private KinesisProducerConfiguration producerConfiguration() {
+      Properties props = getProducerProperties();
+      if (props == null) {
+        props = new Properties();
+      }
+      return KinesisProducerConfiguration.fromProperties(props);
+    }
 
+    private static class KinesisWriterFn extends DoFn<byte[], Void> {
       private static final int MAX_NUM_FAILURES = 10;
+      /** Usage count of static, shared Kinesis producer. */
+      private static final AtomicInteger producerRefCount = new 
AtomicInteger();
+
+      /** Static, shared Kinesis producer. */

Review comment:
       comment was needed indeed :+1: 

##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
           getPartitionKey() == null || (getPartitioner() == null),
           "only one of either withPartitionKey() or withPartitioner() is 
possible");
       checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() 
is required");
+      producerConfiguration(); // verify Kinesis producer configuration can be 
build
 
       input.apply(ParDo.of(new KinesisWriterFn(this)));
       return PDone.in(input.getPipeline());
     }
 
-    private static class KinesisWriterFn extends DoFn<byte[], Void> {
+    private KinesisProducerConfiguration producerConfiguration() {
+      Properties props = getProducerProperties();
+      if (props == null) {
+        props = new Properties();
+      }
+      return KinesisProducerConfiguration.fromProperties(props);
+    }
 
+    private static class KinesisWriterFn extends DoFn<byte[], Void> {
       private static final int MAX_NUM_FAILURES = 10;
+      /** Usage count of static, shared Kinesis producer. */
+      private static final AtomicInteger producerRefCount = new 
AtomicInteger();
+
+      /** Static, shared Kinesis producer. */
+      private static IKinesisProducer producer;
 
       private final KinesisIO.Write spec;
-      private static transient IKinesisProducer producer;
+
       private transient KinesisPartitioner partitioner;
       private transient LinkedBlockingDeque<KinesisWriteException> failures;
       private transient List<Future<UserRecordResult>> putFutures;
 
       KinesisWriterFn(KinesisIO.Write spec) {
         this.spec = spec;
-        initKinesisProducer();
+      }
+
+      /**
+       * Initialize statically shared Kinesis producer if required and count 
usage.
+       *
+       * <p>NOTE: If there is, for whatever reasons, another instance of a 
{@link KinesisWriterFn}
+       * with different producer properties or even a different implementation 
of {@link
+       * AWSClientsProvider}, these changes will be silently discarded in 
favor of an existing
+       * producer instance.
+       */
+      @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+      private void setupSharedProducer() {
+        synchronized (producerRefCount) {
+          if (producer == null) {
+            producer =
+                
spec.getAWSClientsProvider().createKinesisProducer(spec.producerConfiguration());
+            producerRefCount.set(0);
+          }
+        }
+        producerRefCount.incrementAndGet();
+      }
+
+      /**
+       * Discard statically shared producer if it is not used anymore 
according to the usage count.
+       */
+      @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+      private void teardownSharedProducer() {
+        synchronized (producerRefCount) {
+          if (producerRefCount.decrementAndGet() == 0) {

Review comment:
       Funny, you implemented a ref counter mechanism, what other languages 
(like ObjectiveC) use to avoid having a garbage collector mechanism in place 
:smile: 
   That works !

##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
           getPartitionKey() == null || (getPartitioner() == null),
           "only one of either withPartitionKey() or withPartitioner() is 
possible");
       checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() 
is required");
+      producerConfiguration(); // verify Kinesis producer configuration can be 
build
 
       input.apply(ParDo.of(new KinesisWriterFn(this)));
       return PDone.in(input.getPipeline());
     }
 
-    private static class KinesisWriterFn extends DoFn<byte[], Void> {
+    private KinesisProducerConfiguration producerConfiguration() {
+      Properties props = getProducerProperties();
+      if (props == null) {
+        props = new Properties();
+      }
+      return KinesisProducerConfiguration.fromProperties(props);
+    }
 
+    private static class KinesisWriterFn extends DoFn<byte[], Void> {
       private static final int MAX_NUM_FAILURES = 10;
+      /** Usage count of static, shared Kinesis producer. */
+      private static final AtomicInteger producerRefCount = new 
AtomicInteger();
+
+      /** Static, shared Kinesis producer. */
+      private static IKinesisProducer producer;
 
       private final KinesisIO.Write spec;
-      private static transient IKinesisProducer producer;
+
       private transient KinesisPartitioner partitioner;
       private transient LinkedBlockingDeque<KinesisWriteException> failures;
       private transient List<Future<UserRecordResult>> putFutures;
 
       KinesisWriterFn(KinesisIO.Write spec) {
         this.spec = spec;
-        initKinesisProducer();
+      }
+
+      /**
+       * Initialize statically shared Kinesis producer if required and count 
usage.
+       *
+       * <p>NOTE: If there is, for whatever reasons, another instance of a 
{@link KinesisWriterFn}
+       * with different producer properties or even a different implementation 
of {@link
+       * AWSClientsProvider}, these changes will be silently discarded in 
favor of an existing
+       * producer instance.
+       */
+      @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+      private void setupSharedProducer() {
+        synchronized (producerRefCount) {

Review comment:
       fair enough, now the intrinsic lock is on the Class (as 
`producerRefCount` is static) where it should be. No more interleave of the 
initialization instructions :+1: 

##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
           getPartitionKey() == null || (getPartitioner() == null),
           "only one of either withPartitionKey() or withPartitioner() is 
possible");
       checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() 
is required");
+      producerConfiguration(); // verify Kinesis producer configuration can be 
build
 
       input.apply(ParDo.of(new KinesisWriterFn(this)));
       return PDone.in(input.getPipeline());
     }
 
-    private static class KinesisWriterFn extends DoFn<byte[], Void> {
+    private KinesisProducerConfiguration producerConfiguration() {
+      Properties props = getProducerProperties();
+      if (props == null) {
+        props = new Properties();
+      }
+      return KinesisProducerConfiguration.fromProperties(props);
+    }
 
+    private static class KinesisWriterFn extends DoFn<byte[], Void> {
       private static final int MAX_NUM_FAILURES = 10;
+      /** Usage count of static, shared Kinesis producer. */
+      private static final AtomicInteger producerRefCount = new 
AtomicInteger();
+
+      /** Static, shared Kinesis producer. */
+      private static IKinesisProducer producer;

Review comment:
       should still be transient no ? Otherwise there will be problems with the 
serialisation of this DoFn

##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
           getPartitionKey() == null || (getPartitioner() == null),
           "only one of either withPartitionKey() or withPartitioner() is 
possible");
       checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() 
is required");
+      producerConfiguration(); // verify Kinesis producer configuration can be 
build

Review comment:
       indeed, we need to check that the conf can be built but why not store 
the created conf to avoid having to create it later on when the producer is set 
up ? `Write` was already serialized and deserialized at the time expand is 
called so there should be no pb with `KinesisProducerConfiguration` not being 
serializable

##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
           getPartitionKey() == null || (getPartitioner() == null),
           "only one of either withPartitionKey() or withPartitioner() is 
possible");
       checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() 
is required");
+      producerConfiguration(); // verify Kinesis producer configuration can be 
build

Review comment:
       nit: please rename to createProducerConfiguration
   nit2: sed s/ build / built /g

##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
           getPartitionKey() == null || (getPartitioner() == null),
           "only one of either withPartitionKey() or withPartitioner() is 
possible");
       checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() 
is required");
+      producerConfiguration(); // verify Kinesis producer configuration can be 
build
 
       input.apply(ParDo.of(new KinesisWriterFn(this)));
       return PDone.in(input.getPipeline());
     }
 
-    private static class KinesisWriterFn extends DoFn<byte[], Void> {
+    private KinesisProducerConfiguration producerConfiguration() {
+      Properties props = getProducerProperties();
+      if (props == null) {
+        props = new Properties();
+      }
+      return KinesisProducerConfiguration.fromProperties(props);
+    }
 
+    private static class KinesisWriterFn extends DoFn<byte[], Void> {
       private static final int MAX_NUM_FAILURES = 10;
+      /** Usage count of static, shared Kinesis producer. */
+      private static final AtomicInteger producerRefCount = new 
AtomicInteger();
+
+      /** Static, shared Kinesis producer. */
+      private static IKinesisProducer producer;
 
       private final KinesisIO.Write spec;
-      private static transient IKinesisProducer producer;
+
       private transient KinesisPartitioner partitioner;
       private transient LinkedBlockingDeque<KinesisWriteException> failures;
       private transient List<Future<UserRecordResult>> putFutures;
 
       KinesisWriterFn(KinesisIO.Write spec) {
         this.spec = spec;
-        initKinesisProducer();
+      }
+
+      /**
+       * Initialize statically shared Kinesis producer if required and count 
usage.
+       *
+       * <p>NOTE: If there is, for whatever reasons, another instance of a 
{@link KinesisWriterFn}
+       * with different producer properties or even a different implementation 
of {@link
+       * AWSClientsProvider}, these changes will be silently discarded in 
favor of an existing
+       * producer instance.
+       */
+      @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+      private void setupSharedProducer() {
+        synchronized (producerRefCount) {
+          if (producer == null) {
+            producer =
+                
spec.getAWSClientsProvider().createKinesisProducer(spec.producerConfiguration());
+            producerRefCount.set(0);
+          }
+        }
+        producerRefCount.incrementAndGet();
+      }
+
+      /**
+       * Discard statically shared producer if it is not used anymore 
according to the usage count.
+       */
+      @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+      private void teardownSharedProducer() {
+        synchronized (producerRefCount) {
+          if (producerRefCount.decrementAndGet() == 0) {
+            // the following checks should never be true, but just in case
+            if (producer == null) {
+              return;
+            }
+            if (producer.getOutstandingRecordsCount() > 0) {
+              producer.flushSync();
+            }
+            producer.destroy();

Review comment:
       ouch, destroy() was missing, good catch !

##########
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -917,30 +971,6 @@ public void startBundle() {
         putFutures = Collections.synchronizedList(new ArrayList<>());
         /** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
         failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
-        initKinesisProducer();
-      }
-
-      private synchronized void initKinesisProducer() {

Review comment:
       Indeed the intrinsic lock would be on the DoFn instance so the 
initialization threads could interleave between DoFns. :+1: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to