echauchot commented on a change in pull request #15955:
URL: https://github.com/apache/beam/pull/15955#discussion_r758362415
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -917,30 +971,6 @@ public void startBundle() {
putFutures = Collections.synchronizedList(new ArrayList<>());
/** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
- initKinesisProducer();
- }
-
- private synchronized void initKinesisProducer() {
- // Init producer config
- Properties props = spec.getProducerProperties();
- if (props == null) {
- props = new Properties();
- }
- KinesisProducerConfiguration config =
KinesisProducerConfiguration.fromProperties(props);
- // Fix to avoid the following message "WARNING: Exception during
updateCredentials" during
- // producer.destroy() call. More details can be found in this thread:
- // https://github.com/awslabs/amazon-kinesis-producer/issues/10
- config.setCredentialsRefreshDelay(100);
Review comment:
removing this works because, the config is now set only when producer is
null ?
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is
possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider()
is required");
+ producerConfiguration(); // verify Kinesis producer configuration can be
build
input.apply(ParDo.of(new KinesisWriterFn(this)));
return PDone.in(input.getPipeline());
}
- private static class KinesisWriterFn extends DoFn<byte[], Void> {
+ private KinesisProducerConfiguration producerConfiguration() {
+ Properties props = getProducerProperties();
+ if (props == null) {
+ props = new Properties();
+ }
+ return KinesisProducerConfiguration.fromProperties(props);
+ }
+ private static class KinesisWriterFn extends DoFn<byte[], Void> {
private static final int MAX_NUM_FAILURES = 10;
+ /** Usage count of static, shared Kinesis producer. */
+ private static final AtomicInteger producerRefCount = new
AtomicInteger();
+
+ /** Static, shared Kinesis producer. */
Review comment:
comment was needed indeed :+1:
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is
possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider()
is required");
+ producerConfiguration(); // verify Kinesis producer configuration can be
build
input.apply(ParDo.of(new KinesisWriterFn(this)));
return PDone.in(input.getPipeline());
}
- private static class KinesisWriterFn extends DoFn<byte[], Void> {
+ private KinesisProducerConfiguration producerConfiguration() {
+ Properties props = getProducerProperties();
+ if (props == null) {
+ props = new Properties();
+ }
+ return KinesisProducerConfiguration.fromProperties(props);
+ }
+ private static class KinesisWriterFn extends DoFn<byte[], Void> {
private static final int MAX_NUM_FAILURES = 10;
+ /** Usage count of static, shared Kinesis producer. */
+ private static final AtomicInteger producerRefCount = new
AtomicInteger();
+
+ /** Static, shared Kinesis producer. */
+ private static IKinesisProducer producer;
private final KinesisIO.Write spec;
- private static transient IKinesisProducer producer;
+
private transient KinesisPartitioner partitioner;
private transient LinkedBlockingDeque<KinesisWriteException> failures;
private transient List<Future<UserRecordResult>> putFutures;
KinesisWriterFn(KinesisIO.Write spec) {
this.spec = spec;
- initKinesisProducer();
+ }
+
+ /**
+ * Initialize statically shared Kinesis producer if required and count
usage.
+ *
+ * <p>NOTE: If there is, for whatever reasons, another instance of a
{@link KinesisWriterFn}
+ * with different producer properties or even a different implementation
of {@link
+ * AWSClientsProvider}, these changes will be silently discarded in
favor of an existing
+ * producer instance.
+ */
+ @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+ private void setupSharedProducer() {
+ synchronized (producerRefCount) {
+ if (producer == null) {
+ producer =
+
spec.getAWSClientsProvider().createKinesisProducer(spec.producerConfiguration());
+ producerRefCount.set(0);
+ }
+ }
+ producerRefCount.incrementAndGet();
+ }
+
+ /**
+ * Discard statically shared producer if it is not used anymore
according to the usage count.
+ */
+ @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+ private void teardownSharedProducer() {
+ synchronized (producerRefCount) {
+ if (producerRefCount.decrementAndGet() == 0) {
Review comment:
Funny, you implemented a ref counter mechanism, what other languages
(like ObjectiveC) use to avoid having a garbage collector mechanism in place
:smile:
That works !
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is
possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider()
is required");
+ producerConfiguration(); // verify Kinesis producer configuration can be
build
input.apply(ParDo.of(new KinesisWriterFn(this)));
return PDone.in(input.getPipeline());
}
- private static class KinesisWriterFn extends DoFn<byte[], Void> {
+ private KinesisProducerConfiguration producerConfiguration() {
+ Properties props = getProducerProperties();
+ if (props == null) {
+ props = new Properties();
+ }
+ return KinesisProducerConfiguration.fromProperties(props);
+ }
+ private static class KinesisWriterFn extends DoFn<byte[], Void> {
private static final int MAX_NUM_FAILURES = 10;
+ /** Usage count of static, shared Kinesis producer. */
+ private static final AtomicInteger producerRefCount = new
AtomicInteger();
+
+ /** Static, shared Kinesis producer. */
+ private static IKinesisProducer producer;
private final KinesisIO.Write spec;
- private static transient IKinesisProducer producer;
+
private transient KinesisPartitioner partitioner;
private transient LinkedBlockingDeque<KinesisWriteException> failures;
private transient List<Future<UserRecordResult>> putFutures;
KinesisWriterFn(KinesisIO.Write spec) {
this.spec = spec;
- initKinesisProducer();
+ }
+
+ /**
+ * Initialize statically shared Kinesis producer if required and count
usage.
+ *
+ * <p>NOTE: If there is, for whatever reasons, another instance of a
{@link KinesisWriterFn}
+ * with different producer properties or even a different implementation
of {@link
+ * AWSClientsProvider}, these changes will be silently discarded in
favor of an existing
+ * producer instance.
+ */
+ @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+ private void setupSharedProducer() {
+ synchronized (producerRefCount) {
Review comment:
fair enough, now the intrinsic lock is on the Class (as
`producerRefCount` is static) where it should be. No more interleave of the
initialization instructions :+1:
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is
possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider()
is required");
+ producerConfiguration(); // verify Kinesis producer configuration can be
build
input.apply(ParDo.of(new KinesisWriterFn(this)));
return PDone.in(input.getPipeline());
}
- private static class KinesisWriterFn extends DoFn<byte[], Void> {
+ private KinesisProducerConfiguration producerConfiguration() {
+ Properties props = getProducerProperties();
+ if (props == null) {
+ props = new Properties();
+ }
+ return KinesisProducerConfiguration.fromProperties(props);
+ }
+ private static class KinesisWriterFn extends DoFn<byte[], Void> {
private static final int MAX_NUM_FAILURES = 10;
+ /** Usage count of static, shared Kinesis producer. */
+ private static final AtomicInteger producerRefCount = new
AtomicInteger();
+
+ /** Static, shared Kinesis producer. */
+ private static IKinesisProducer producer;
Review comment:
should still be transient no ? Otherwise there will be problems with the
serialisation of this DoFn
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is
possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider()
is required");
+ producerConfiguration(); // verify Kinesis producer configuration can be
build
Review comment:
indeed, we need to check that the conf can be built but why not store
the created conf to avoid having to create it later on when the producer is set
up ? `Write` was already serialized and deserialized at the time expand is
called so there should be no pb with `KinesisProducerConfiguration` not being
serializable
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is
possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider()
is required");
+ producerConfiguration(); // verify Kinesis producer configuration can be
build
Review comment:
nit: please rename to createProducerConfiguration
nit2: sed s/ build / built /g
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -884,28 +885,81 @@ public PDone expand(PCollection<byte[]> input) {
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is
possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider()
is required");
+ producerConfiguration(); // verify Kinesis producer configuration can be
build
input.apply(ParDo.of(new KinesisWriterFn(this)));
return PDone.in(input.getPipeline());
}
- private static class KinesisWriterFn extends DoFn<byte[], Void> {
+ private KinesisProducerConfiguration producerConfiguration() {
+ Properties props = getProducerProperties();
+ if (props == null) {
+ props = new Properties();
+ }
+ return KinesisProducerConfiguration.fromProperties(props);
+ }
+ private static class KinesisWriterFn extends DoFn<byte[], Void> {
private static final int MAX_NUM_FAILURES = 10;
+ /** Usage count of static, shared Kinesis producer. */
+ private static final AtomicInteger producerRefCount = new
AtomicInteger();
+
+ /** Static, shared Kinesis producer. */
+ private static IKinesisProducer producer;
private final KinesisIO.Write spec;
- private static transient IKinesisProducer producer;
+
private transient KinesisPartitioner partitioner;
private transient LinkedBlockingDeque<KinesisWriteException> failures;
private transient List<Future<UserRecordResult>> putFutures;
KinesisWriterFn(KinesisIO.Write spec) {
this.spec = spec;
- initKinesisProducer();
+ }
+
+ /**
+ * Initialize statically shared Kinesis producer if required and count
usage.
+ *
+ * <p>NOTE: If there is, for whatever reasons, another instance of a
{@link KinesisWriterFn}
+ * with different producer properties or even a different implementation
of {@link
+ * AWSClientsProvider}, these changes will be silently discarded in
favor of an existing
+ * producer instance.
+ */
+ @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+ private void setupSharedProducer() {
+ synchronized (producerRefCount) {
+ if (producer == null) {
+ producer =
+
spec.getAWSClientsProvider().createKinesisProducer(spec.producerConfiguration());
+ producerRefCount.set(0);
+ }
+ }
+ producerRefCount.incrementAndGet();
+ }
+
+ /**
+ * Discard statically shared producer if it is not used anymore
according to the usage count.
+ */
+ @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+ private void teardownSharedProducer() {
+ synchronized (producerRefCount) {
+ if (producerRefCount.decrementAndGet() == 0) {
+ // the following checks should never be true, but just in case
+ if (producer == null) {
+ return;
+ }
+ if (producer.getOutstandingRecordsCount() > 0) {
+ producer.flushSync();
+ }
+ producer.destroy();
Review comment:
ouch, destroy() was missing, good catch !
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -917,30 +971,6 @@ public void startBundle() {
putFutures = Collections.synchronizedList(new ArrayList<>());
/** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
- initKinesisProducer();
- }
-
- private synchronized void initKinesisProducer() {
Review comment:
Indeed the intrinsic lock would be on the DoFn instance so the
initialization threads could interleave between DoFns. :+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]