aromanenko-dev commented on a change in pull request #16077:
URL: https://github.com/apache/beam/pull/16077#discussion_r773251457



##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {

Review comment:
       The returned result is not used later. Is it intended? Should it be 
`public`?

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(

Review comment:
       Better to check it in `withBatchMaxRecords()`

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(
+          batchMaxBytes() > 0 && batchMaxBytes() <= MAX_BYTES_PER_REQUEST,
+          "batchMaxBytes must be in [1,%s]",
+          MAX_BYTES_PER_REQUEST);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
MAX_BYTES_PER_RECORD,
+          "aggregatedRecordsMaxBytes must be <= %s",
+          MAX_BYTES_PER_RECORD);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
batchMaxBytes(),
+          "aggregatedRecordsMaxBytes must be <= batchMaxBytes (%s)",
+          batchMaxBytes());
+      return this;
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      validated();
+      if (aggregatedRecords()) {
+        input.apply(ParDo.of(new AggregatedWriterFn<T>(this)));
+      } else {
+        input.apply(ParDo.of(new WriterFn<T>(this)));
+      }
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriterFn<T> extends DoFn<T, Void> {
+      private static final int MIN_PARTIAL_RETRIES = 10; // Retries for 
partial success (throttling)
+      private static final ClientPool<
+              KinesisAsyncClientProvider, RetryConfiguration, 
KinesisAsyncClient>
+          clients = ClientPool.autoClosable((p, c) -> 
p.getKinesisAsyncClient(clientOverride(c)));
+
+      private static ClientOverrideConfiguration 
clientOverride(RetryConfiguration retryConfig) {
+        return (retryConfig == null) ? null : retryConfig.toClientOverride();
+      }
+
+      protected final Write<T> spec;
+
+      private transient AsyncPutRecordsHandler handler;
+      private transient List<PutRecordsRequestEntry> requestEntries;
+      private transient int requestBytes;
+
+      WriterFn(Write<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public final void setup() {
+        RetryConfiguration retryConfig = spec.retryConfiguration();
+        FluentBackoff backoff =
+            (retryConfig != null)
+                ? retryConfig
+                    .toThrottlingBackoff()
+                    .withMaxRetries(max(MIN_PARTIAL_RETRIES, 
retryConfig.numRetries()))
+                : FluentBackoff.DEFAULT.withMaxRetries(MIN_PARTIAL_RETRIES);
+        KinesisAsyncClient client = 
clients.retain(spec.kinesisClientProvider(), retryConfig);
+        handler = new AsyncPutRecordsHandler(client, 
spec.concurrentRequests(), backoff);
+      }
+
+      @StartBundle
+      public void startBundle() {
+        requestEntries = new ArrayList<>();
+        requestBytes = 0;
+        handler.reset();
+      }
+
+      @ProcessElement
+      public final void processElement(ProcessContext c) throws Throwable {
+        handler.checkForAsyncFailure();
+        T record = c.element();
+        byte[] data = spec.serializer().apply(record);
+        String pk = spec.partitioner().getPartitionKey(record);
+        String ehk = spec.partitioner().getExplicitHashKey(record);
+
+        validatePartitionKey(pk);
+        if (ehk != null) {
+          validateExplicitHashKey(ehk);
+        }
+        processElement(pk, ehk, data);
+      }
+
+      protected void processElement(
+          String partitionKey, @Nullable String explicitHashKey, byte[] data) 
throws Throwable {
+        addRequestEntry(
+            PutRecordsRequestEntry.builder()
+                .data(SdkBytes.fromByteArrayUnsafe(data))
+                .partitionKey(partitionKey)
+                .explicitHashKey(explicitHashKey)
+                .build(),
+            spec.batchMaxRecords());
+      }
+
+      protected final void addRequestEntry(PutRecordsRequestEntry entry, int 
maxEntries)
+          throws Throwable {
+        int entryBytes =
+            entry.partitionKey().getBytes(UTF_8).length + 
entry.data().asByteArrayUnsafe().length;
+        // check first if new record can still be added to batch, flush 
otherwise
+        if (requestBytes + entryBytes > spec.batchMaxBytes()) {
+          flushEntries();
+        }
+        // the entry might exceed batchMaxBytes, but try anyways
+        requestEntries.add(entry);
+        requestBytes += entryBytes;
+        // flush once batch size limit is reached
+        if (requestEntries.size() >= min(maxEntries, spec.batchMaxRecords())) {
+          flushEntries();
+        }
+      }
+
+      private void flushEntries() throws Throwable {
+        if (!handler.hasErrored() && !requestEntries.isEmpty()) {
+          // Swap lists, luckily no need to synchronize
+          List<PutRecordsRequestEntry> recordsToWrite = requestEntries;
+          requestEntries = new ArrayList<>();
+          requestBytes = 0;
+          handler.putRecords(spec.streamName(), recordsToWrite);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Throwable {
+        flushEntries();
+        handler.waitForCompletion();
+      }
+
+      @Teardown

Review comment:
       `@Teardown` is a "best effort" to be called. Is it ok to rely on it in 
this case?

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(
+          batchMaxBytes() > 0 && batchMaxBytes() <= MAX_BYTES_PER_REQUEST,
+          "batchMaxBytes must be in [1,%s]",
+          MAX_BYTES_PER_REQUEST);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
MAX_BYTES_PER_RECORD,
+          "aggregatedRecordsMaxBytes must be <= %s",
+          MAX_BYTES_PER_RECORD);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
batchMaxBytes(),
+          "aggregatedRecordsMaxBytes must be <= batchMaxBytes (%s)",
+          batchMaxBytes());
+      return this;
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      validated();
+      if (aggregatedRecords()) {
+        input.apply(ParDo.of(new AggregatedWriterFn<T>(this)));
+      } else {
+        input.apply(ParDo.of(new WriterFn<T>(this)));
+      }
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriterFn<T> extends DoFn<T, Void> {
+      private static final int MIN_PARTIAL_RETRIES = 10; // Retries for 
partial success (throttling)
+      private static final ClientPool<
+              KinesisAsyncClientProvider, RetryConfiguration, 
KinesisAsyncClient>
+          clients = ClientPool.autoClosable((p, c) -> 
p.getKinesisAsyncClient(clientOverride(c)));
+
+      private static ClientOverrideConfiguration 
clientOverride(RetryConfiguration retryConfig) {
+        return (retryConfig == null) ? null : retryConfig.toClientOverride();
+      }
+
+      protected final Write<T> spec;
+
+      private transient AsyncPutRecordsHandler handler;
+      private transient List<PutRecordsRequestEntry> requestEntries;
+      private transient int requestBytes;
+
+      WriterFn(Write<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public final void setup() {
+        RetryConfiguration retryConfig = spec.retryConfiguration();
+        FluentBackoff backoff =
+            (retryConfig != null)
+                ? retryConfig
+                    .toThrottlingBackoff()
+                    .withMaxRetries(max(MIN_PARTIAL_RETRIES, 
retryConfig.numRetries()))
+                : FluentBackoff.DEFAULT.withMaxRetries(MIN_PARTIAL_RETRIES);
+        KinesisAsyncClient client = 
clients.retain(spec.kinesisClientProvider(), retryConfig);
+        handler = new AsyncPutRecordsHandler(client, 
spec.concurrentRequests(), backoff);
+      }
+
+      @StartBundle
+      public void startBundle() {
+        requestEntries = new ArrayList<>();
+        requestBytes = 0;
+        handler.reset();
+      }
+
+      @ProcessElement
+      public final void processElement(ProcessContext c) throws Throwable {
+        handler.checkForAsyncFailure();
+        T record = c.element();
+        byte[] data = spec.serializer().apply(record);
+        String pk = spec.partitioner().getPartitionKey(record);
+        String ehk = spec.partitioner().getExplicitHashKey(record);
+
+        validatePartitionKey(pk);
+        if (ehk != null) {
+          validateExplicitHashKey(ehk);
+        }
+        processElement(pk, ehk, data);
+      }
+
+      protected void processElement(
+          String partitionKey, @Nullable String explicitHashKey, byte[] data) 
throws Throwable {
+        addRequestEntry(
+            PutRecordsRequestEntry.builder()
+                .data(SdkBytes.fromByteArrayUnsafe(data))
+                .partitionKey(partitionKey)
+                .explicitHashKey(explicitHashKey)
+                .build(),
+            spec.batchMaxRecords());
+      }
+
+      protected final void addRequestEntry(PutRecordsRequestEntry entry, int 
maxEntries)
+          throws Throwable {
+        int entryBytes =
+            entry.partitionKey().getBytes(UTF_8).length + 
entry.data().asByteArrayUnsafe().length;
+        // check first if new record can still be added to batch, flush 
otherwise
+        if (requestBytes + entryBytes > spec.batchMaxBytes()) {
+          flushEntries();
+        }
+        // the entry might exceed batchMaxBytes, but try anyways
+        requestEntries.add(entry);
+        requestBytes += entryBytes;
+        // flush once batch size limit is reached
+        if (requestEntries.size() >= min(maxEntries, spec.batchMaxRecords())) {
+          flushEntries();
+        }
+      }
+
+      private void flushEntries() throws Throwable {
+        if (!handler.hasErrored() && !requestEntries.isEmpty()) {
+          // Swap lists, luckily no need to synchronize
+          List<PutRecordsRequestEntry> recordsToWrite = requestEntries;
+          requestEntries = new ArrayList<>();
+          requestBytes = 0;
+          handler.putRecords(spec.streamName(), recordsToWrite);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Throwable {
+        flushEntries();
+        handler.waitForCompletion();
+      }
+
+      @Teardown
+      public void teardown() throws Exception {
+        clients.release(spec.kinesisClientProvider(), 
spec.retryConfiguration());
+      }
+
+      private void validatePartitionKey(String pk) {
+        int size = pk != null ? pk.length() : 0;
+        checkArgument(
+            PARTITION_KEY_MIN_LENGTH <= size && size <= 
PARTITION_KEY_MAX_LENGTH,
+            "Invalid partition key of length {}",
+            size);
+      }
+
+      private void validateExplicitHashKey(String hashKey) {
+        BigInteger key = new BigInteger(hashKey);
+        checkArgument(
+            key.compareTo(MIN_HASH_KEY) >= 0 && key.compareTo(MAX_HASH_KEY) <= 
0,
+            "Explicit hash key must be 128-bit number.");
+      }
+    }
+
+    private static class AggregatedWriterFn<T> extends WriterFn<T> {

Review comment:
       Please, add a Javadoc with details of implementation for this class

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(
+          batchMaxBytes() > 0 && batchMaxBytes() <= MAX_BYTES_PER_REQUEST,
+          "batchMaxBytes must be in [1,%s]",
+          MAX_BYTES_PER_REQUEST);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
MAX_BYTES_PER_RECORD,
+          "aggregatedRecordsMaxBytes must be <= %s",
+          MAX_BYTES_PER_RECORD);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
batchMaxBytes(),
+          "aggregatedRecordsMaxBytes must be <= batchMaxBytes (%s)",
+          batchMaxBytes());
+      return this;
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      validated();
+      if (aggregatedRecords()) {
+        input.apply(ParDo.of(new AggregatedWriterFn<T>(this)));
+      } else {
+        input.apply(ParDo.of(new WriterFn<T>(this)));
+      }
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriterFn<T> extends DoFn<T, Void> {
+      private static final int MIN_PARTIAL_RETRIES = 10; // Retries for 
partial success (throttling)
+      private static final ClientPool<
+              KinesisAsyncClientProvider, RetryConfiguration, 
KinesisAsyncClient>
+          clients = ClientPool.autoClosable((p, c) -> 
p.getKinesisAsyncClient(clientOverride(c)));
+
+      private static ClientOverrideConfiguration 
clientOverride(RetryConfiguration retryConfig) {
+        return (retryConfig == null) ? null : retryConfig.toClientOverride();
+      }
+
+      protected final Write<T> spec;
+
+      private transient AsyncPutRecordsHandler handler;
+      private transient List<PutRecordsRequestEntry> requestEntries;
+      private transient int requestBytes;
+
+      WriterFn(Write<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public final void setup() {
+        RetryConfiguration retryConfig = spec.retryConfiguration();
+        FluentBackoff backoff =
+            (retryConfig != null)
+                ? retryConfig
+                    .toThrottlingBackoff()
+                    .withMaxRetries(max(MIN_PARTIAL_RETRIES, 
retryConfig.numRetries()))
+                : FluentBackoff.DEFAULT.withMaxRetries(MIN_PARTIAL_RETRIES);
+        KinesisAsyncClient client = 
clients.retain(spec.kinesisClientProvider(), retryConfig);
+        handler = new AsyncPutRecordsHandler(client, 
spec.concurrentRequests(), backoff);
+      }
+
+      @StartBundle
+      public void startBundle() {
+        requestEntries = new ArrayList<>();
+        requestBytes = 0;
+        handler.reset();
+      }
+
+      @ProcessElement
+      public final void processElement(ProcessContext c) throws Throwable {
+        handler.checkForAsyncFailure();
+        T record = c.element();
+        byte[] data = spec.serializer().apply(record);
+        String pk = spec.partitioner().getPartitionKey(record);
+        String ehk = spec.partitioner().getExplicitHashKey(record);
+
+        validatePartitionKey(pk);
+        if (ehk != null) {
+          validateExplicitHashKey(ehk);
+        }
+        processElement(pk, ehk, data);
+      }
+
+      protected void processElement(
+          String partitionKey, @Nullable String explicitHashKey, byte[] data) 
throws Throwable {
+        addRequestEntry(
+            PutRecordsRequestEntry.builder()
+                .data(SdkBytes.fromByteArrayUnsafe(data))
+                .partitionKey(partitionKey)
+                .explicitHashKey(explicitHashKey)
+                .build(),
+            spec.batchMaxRecords());
+      }
+
+      protected final void addRequestEntry(PutRecordsRequestEntry entry, int 
maxEntries)
+          throws Throwable {
+        int entryBytes =
+            entry.partitionKey().getBytes(UTF_8).length + 
entry.data().asByteArrayUnsafe().length;
+        // check first if new record can still be added to batch, flush 
otherwise
+        if (requestBytes + entryBytes > spec.batchMaxBytes()) {
+          flushEntries();
+        }
+        // the entry might exceed batchMaxBytes, but try anyways
+        requestEntries.add(entry);
+        requestBytes += entryBytes;
+        // flush once batch size limit is reached
+        if (requestEntries.size() >= min(maxEntries, spec.batchMaxRecords())) {
+          flushEntries();
+        }
+      }
+
+      private void flushEntries() throws Throwable {
+        if (!handler.hasErrored() && !requestEntries.isEmpty()) {
+          // Swap lists, luckily no need to synchronize
+          List<PutRecordsRequestEntry> recordsToWrite = requestEntries;
+          requestEntries = new ArrayList<>();
+          requestBytes = 0;
+          handler.putRecords(spec.streamName(), recordsToWrite);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Throwable {
+        flushEntries();
+        handler.waitForCompletion();
+      }
+
+      @Teardown
+      public void teardown() throws Exception {
+        clients.release(spec.kinesisClientProvider(), 
spec.retryConfiguration());
+      }
+
+      private void validatePartitionKey(String pk) {
+        int size = pk != null ? pk.length() : 0;
+        checkArgument(

Review comment:
       nit: `checkState()`

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *

Review comment:
       nit: needless `*` at the end of line.

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(
+          batchMaxBytes() > 0 && batchMaxBytes() <= MAX_BYTES_PER_REQUEST,
+          "batchMaxBytes must be in [1,%s]",
+          MAX_BYTES_PER_REQUEST);
+      checkArgument(

Review comment:
       Better to check it in `aggregatedRecordsMaxBytes()`

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/write/KinesisPartitioner.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis.write;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Random;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+
+/** Kinesis interface for custom partitioner. */
+public interface KinesisPartitioner<T> extends Serializable {
+  BigInteger MIN_HASH_KEY = new BigInteger("0");

Review comment:
       Do we really need a BigInteger here?

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(
+          batchMaxBytes() > 0 && batchMaxBytes() <= MAX_BYTES_PER_REQUEST,
+          "batchMaxBytes must be in [1,%s]",
+          MAX_BYTES_PER_REQUEST);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
MAX_BYTES_PER_RECORD,
+          "aggregatedRecordsMaxBytes must be <= %s",
+          MAX_BYTES_PER_RECORD);
+      checkArgument(

Review comment:
       nit: `checkState()`

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(

Review comment:
       Better to check it in  `withBatchMaxBytes()`

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(
+          batchMaxBytes() > 0 && batchMaxBytes() <= MAX_BYTES_PER_REQUEST,
+          "batchMaxBytes must be in [1,%s]",
+          MAX_BYTES_PER_REQUEST);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
MAX_BYTES_PER_RECORD,
+          "aggregatedRecordsMaxBytes must be <= %s",
+          MAX_BYTES_PER_RECORD);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
batchMaxBytes(),
+          "aggregatedRecordsMaxBytes must be <= batchMaxBytes (%s)",
+          batchMaxBytes());
+      return this;
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      validated();
+      if (aggregatedRecords()) {
+        input.apply(ParDo.of(new AggregatedWriterFn<T>(this)));
+      } else {
+        input.apply(ParDo.of(new WriterFn<T>(this)));
+      }
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriterFn<T> extends DoFn<T, Void> {
+      private static final int MIN_PARTIAL_RETRIES = 10; // Retries for 
partial success (throttling)
+      private static final ClientPool<
+              KinesisAsyncClientProvider, RetryConfiguration, 
KinesisAsyncClient>
+          clients = ClientPool.autoClosable((p, c) -> 
p.getKinesisAsyncClient(clientOverride(c)));
+
+      private static ClientOverrideConfiguration 
clientOverride(RetryConfiguration retryConfig) {
+        return (retryConfig == null) ? null : retryConfig.toClientOverride();
+      }
+
+      protected final Write<T> spec;
+
+      private transient AsyncPutRecordsHandler handler;
+      private transient List<PutRecordsRequestEntry> requestEntries;
+      private transient int requestBytes;

Review comment:
       why this field is `transient`?

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(
+          batchMaxBytes() > 0 && batchMaxBytes() <= MAX_BYTES_PER_REQUEST,
+          "batchMaxBytes must be in [1,%s]",
+          MAX_BYTES_PER_REQUEST);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
MAX_BYTES_PER_RECORD,
+          "aggregatedRecordsMaxBytes must be <= %s",
+          MAX_BYTES_PER_RECORD);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
batchMaxBytes(),
+          "aggregatedRecordsMaxBytes must be <= batchMaxBytes (%s)",
+          batchMaxBytes());
+      return this;
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      validated();
+      if (aggregatedRecords()) {
+        input.apply(ParDo.of(new AggregatedWriterFn<T>(this)));
+      } else {
+        input.apply(ParDo.of(new WriterFn<T>(this)));
+      }
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriterFn<T> extends DoFn<T, Void> {
+      private static final int MIN_PARTIAL_RETRIES = 10; // Retries for 
partial success (throttling)
+      private static final ClientPool<
+              KinesisAsyncClientProvider, RetryConfiguration, 
KinesisAsyncClient>
+          clients = ClientPool.autoClosable((p, c) -> 
p.getKinesisAsyncClient(clientOverride(c)));
+
+      private static ClientOverrideConfiguration 
clientOverride(RetryConfiguration retryConfig) {
+        return (retryConfig == null) ? null : retryConfig.toClientOverride();
+      }
+
+      protected final Write<T> spec;
+
+      private transient AsyncPutRecordsHandler handler;
+      private transient List<PutRecordsRequestEntry> requestEntries;
+      private transient int requestBytes;
+
+      WriterFn(Write<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public final void setup() {
+        RetryConfiguration retryConfig = spec.retryConfiguration();
+        FluentBackoff backoff =
+            (retryConfig != null)
+                ? retryConfig
+                    .toThrottlingBackoff()
+                    .withMaxRetries(max(MIN_PARTIAL_RETRIES, 
retryConfig.numRetries()))
+                : FluentBackoff.DEFAULT.withMaxRetries(MIN_PARTIAL_RETRIES);
+        KinesisAsyncClient client = 
clients.retain(spec.kinesisClientProvider(), retryConfig);
+        handler = new AsyncPutRecordsHandler(client, 
spec.concurrentRequests(), backoff);
+      }
+
+      @StartBundle
+      public void startBundle() {
+        requestEntries = new ArrayList<>();
+        requestBytes = 0;
+        handler.reset();
+      }
+
+      @ProcessElement
+      public final void processElement(ProcessContext c) throws Throwable {
+        handler.checkForAsyncFailure();
+        T record = c.element();
+        byte[] data = spec.serializer().apply(record);
+        String pk = spec.partitioner().getPartitionKey(record);
+        String ehk = spec.partitioner().getExplicitHashKey(record);

Review comment:
       nit: `partitionKey` and `explicitHashKey` ?

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(
+          batchMaxBytes() > 0 && batchMaxBytes() <= MAX_BYTES_PER_REQUEST,
+          "batchMaxBytes must be in [1,%s]",
+          MAX_BYTES_PER_REQUEST);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
MAX_BYTES_PER_RECORD,
+          "aggregatedRecordsMaxBytes must be <= %s",
+          MAX_BYTES_PER_RECORD);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
batchMaxBytes(),
+          "aggregatedRecordsMaxBytes must be <= batchMaxBytes (%s)",
+          batchMaxBytes());
+      return this;
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      validated();
+      if (aggregatedRecords()) {
+        input.apply(ParDo.of(new AggregatedWriterFn<T>(this)));
+      } else {
+        input.apply(ParDo.of(new WriterFn<T>(this)));
+      }
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriterFn<T> extends DoFn<T, Void> {
+      private static final int MIN_PARTIAL_RETRIES = 10; // Retries for 
partial success (throttling)
+      private static final ClientPool<
+              KinesisAsyncClientProvider, RetryConfiguration, 
KinesisAsyncClient>
+          clients = ClientPool.autoClosable((p, c) -> 
p.getKinesisAsyncClient(clientOverride(c)));
+
+      private static ClientOverrideConfiguration 
clientOverride(RetryConfiguration retryConfig) {
+        return (retryConfig == null) ? null : retryConfig.toClientOverride();
+      }
+
+      protected final Write<T> spec;
+
+      private transient AsyncPutRecordsHandler handler;
+      private transient List<PutRecordsRequestEntry> requestEntries;
+      private transient int requestBytes;
+
+      WriterFn(Write<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public final void setup() {
+        RetryConfiguration retryConfig = spec.retryConfiguration();
+        FluentBackoff backoff =
+            (retryConfig != null)
+                ? retryConfig
+                    .toThrottlingBackoff()
+                    .withMaxRetries(max(MIN_PARTIAL_RETRIES, 
retryConfig.numRetries()))
+                : FluentBackoff.DEFAULT.withMaxRetries(MIN_PARTIAL_RETRIES);
+        KinesisAsyncClient client = 
clients.retain(spec.kinesisClientProvider(), retryConfig);
+        handler = new AsyncPutRecordsHandler(client, 
spec.concurrentRequests(), backoff);
+      }
+
+      @StartBundle
+      public void startBundle() {
+        requestEntries = new ArrayList<>();
+        requestBytes = 0;
+        handler.reset();
+      }
+
+      @ProcessElement
+      public final void processElement(ProcessContext c) throws Throwable {
+        handler.checkForAsyncFailure();
+        T record = c.element();
+        byte[] data = spec.serializer().apply(record);
+        String pk = spec.partitioner().getPartitionKey(record);
+        String ehk = spec.partitioner().getExplicitHashKey(record);
+
+        validatePartitionKey(pk);
+        if (ehk != null) {
+          validateExplicitHashKey(ehk);
+        }
+        processElement(pk, ehk, data);
+      }
+
+      protected void processElement(
+          String partitionKey, @Nullable String explicitHashKey, byte[] data) 
throws Throwable {
+        addRequestEntry(
+            PutRecordsRequestEntry.builder()
+                .data(SdkBytes.fromByteArrayUnsafe(data))
+                .partitionKey(partitionKey)
+                .explicitHashKey(explicitHashKey)
+                .build(),
+            spec.batchMaxRecords());
+      }
+
+      protected final void addRequestEntry(PutRecordsRequestEntry entry, int 
maxEntries)
+          throws Throwable {
+        int entryBytes =
+            entry.partitionKey().getBytes(UTF_8).length + 
entry.data().asByteArrayUnsafe().length;
+        // check first if new record can still be added to batch, flush 
otherwise
+        if (requestBytes + entryBytes > spec.batchMaxBytes()) {
+          flushEntries();
+        }
+        // the entry might exceed batchMaxBytes, but try anyways
+        requestEntries.add(entry);
+        requestBytes += entryBytes;
+        // flush once batch size limit is reached
+        if (requestEntries.size() >= min(maxEntries, spec.batchMaxRecords())) {
+          flushEntries();
+        }
+      }
+
+      private void flushEntries() throws Throwable {
+        if (!handler.hasErrored() && !requestEntries.isEmpty()) {
+          // Swap lists, luckily no need to synchronize
+          List<PutRecordsRequestEntry> recordsToWrite = requestEntries;
+          requestEntries = new ArrayList<>();
+          requestBytes = 0;
+          handler.putRecords(spec.streamName(), recordsToWrite);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Throwable {
+        flushEntries();
+        handler.waitForCompletion();
+      }
+
+      @Teardown
+      public void teardown() throws Exception {
+        clients.release(spec.kinesisClientProvider(), 
spec.retryConfiguration());
+      }
+
+      private void validatePartitionKey(String pk) {

Review comment:
       nit: `String partitionKey`

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -522,4 +651,393 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) {
       return input.apply(transform);
     }
   }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int DEFAULT_CONCURRENCY = 1;
+
+    abstract @Nullable String streamName();
+
+    abstract int batchMaxRecords();
+
+    abstract int batchMaxBytes();
+
+    abstract int concurrentRequests();
+
+    abstract boolean aggregatedRecords();
+
+    abstract int aggregatedRecordsMaxBytes();
+
+    abstract @Nullable KinesisPartitioner<T> partitioner();
+
+    abstract @Nullable SerializableFunction<T, byte[]> serializer();
+
+    abstract @Nullable KinesisAsyncClientProvider kinesisClientProvider();
+
+    abstract @Nullable RetryConfiguration retryConfiguration();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> streamName(String streamName);
+
+      abstract Builder<T> batchMaxRecords(int records);
+
+      abstract Builder<T> batchMaxBytes(int bytes);
+
+      abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+      abstract Builder<T> aggregatedRecords(boolean aggregatedRecords);
+
+      abstract Builder<T> aggregatedRecordsMaxBytes(int bytes);
+
+      abstract Builder<T> partitioner(KinesisPartitioner<T> partitioner);
+
+      abstract Builder<T> serializer(SerializableFunction<T, byte[]> 
serializer);
+
+      abstract Builder<T> kinesisClientProvider(KinesisAsyncClientProvider 
clientProvider);
+
+      abstract Builder<T> retryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Write<T> build();
+    }
+
+    /** Kinesis stream name which will be used for writing (required). */
+    public Write<T> withStreamName(String streamName) {
+      return builder().streamName(streamName).build();
+    }
+
+    /** Max. number of records to send per batch write request. */
+    public Write<T> withBatchMaxRecords(int records) {
+      return builder().batchMaxRecords(records).build();
+    }
+
+    /**
+     * Max. number of bytes to send per batch write request.
+     *
+     * <p>Single records that exceed this limit are sent individually. Though, 
be careful to not
+     * violate the AWS API limit of 1MB per request.
+     *
+     * <p>This includes both partition keys and data.
+     */
+    public Write<T> withBatchMaxBytes(int bytes) {
+      return builder().batchMaxBytes(bytes).build();
+    }
+
+    /** Max number of concurrent batch write requests per bundle. */
+    public Write<T> withConcurrentRequests(int concurrentRequests) {
+      return builder().concurrentRequests(concurrentRequests).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(boolean enabled) {
+      return builder().aggregatedRecords(enabled).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords() {
+      return builder().aggregatedRecords(true).build();
+    }
+
+    /**
+     * Enable record aggregation that is compatible with the KPL.
+     *
+     * 
<p>https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
+     *
+     * <p>Note: The aggregation is a lot simpler than the one offered by KPL. 
It only aggregates
+     * records with the same partition key as it's not aware of explicit hash 
key ranges per shard.
+     */
+    public Write<T> withAggregatedRecords(int maxAggregatedBytes) {
+      return builder()
+          .aggregatedRecords(true)
+          .aggregatedRecordsMaxBytes(maxAggregatedBytes)
+          .build();
+    }
+
+    /**
+     * Specify how to partition records among all stream shards (required).
+     *
+     * <p>The partitioner is critical to distribute new records among all 
stream shards.
+     */
+    public Write<T> withPartitioner(KinesisPartitioner<T> partitioner) {
+      return builder().partitioner(partitioner).build();
+    }
+
+    /** Specify how to serialize records to bytes on the stream (required). */
+    public Write<T> withSerializer(SerializableFunction<T, byte[]> serializer) 
{
+      return builder().serializer(serializer).build();
+    }
+
+    /**
+     * Specify a custom {@link KinesisAsyncClientProvider} providing a {@link 
KinesisAsyncClient} to
+     * communication with Kinesis.
+     *
+     * <p>When sufficient use {@link 
Write#withKinesisClientProvider(AwsCredentialsProvider,
+     * Region)}.
+     */
+    public Write<T> withKinesisClientProvider(KinesisAsyncClientProvider 
clientProvider) {
+      return builder().kinesisClientProvider(clientProvider).build();
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis.
+     */
+    public Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region) {
+      return withKinesisClientProvider(new 
BasicKinesisProvider(credentialsProvider, region, null));
+    }
+
+    /**
+     * Specify the {@link AwsCredentialsProvider} and AWS region to be used to 
communicate with
+     * Kinesis. *
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    Write<T> withKinesisClientProvider(
+        AwsCredentialsProvider credentialsProvider, Region region, String 
endpoint) {
+      return withKinesisClientProvider(
+          new BasicKinesisProvider(credentialsProvider, region, endpoint));
+    }
+
+    /** Configure the AWS SDK retry behavior of the {@link 
KinesisAsyncClient}. */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      return builder().retryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> validated() {
+      checkArgument(!isEmpty(streamName()), "streamName cannot be empty");
+      checkArgument(partitioner() != null, "partitioner cannot be null");
+      checkArgument(serializer() != null, "serializer cannot be null");
+      checkArgument(kinesisClientProvider() != null, "kinesisClientProvider 
cannot be null");
+      checkArgument(concurrentRequests() > 0, "concurrentRequests must be > 
0");
+      checkArgument(
+          batchMaxRecords() > 0 && batchMaxRecords() <= 
MAX_RECORDS_PER_REQUEST,
+          "batchMaxRecords must be in [1,%s]",
+          MAX_RECORDS_PER_REQUEST);
+      checkArgument(
+          batchMaxBytes() > 0 && batchMaxBytes() <= MAX_BYTES_PER_REQUEST,
+          "batchMaxBytes must be in [1,%s]",
+          MAX_BYTES_PER_REQUEST);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
MAX_BYTES_PER_RECORD,
+          "aggregatedRecordsMaxBytes must be <= %s",
+          MAX_BYTES_PER_RECORD);
+      checkArgument(
+          !aggregatedRecords() || aggregatedRecordsMaxBytes() <= 
batchMaxBytes(),
+          "aggregatedRecordsMaxBytes must be <= batchMaxBytes (%s)",
+          batchMaxBytes());
+      return this;
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      validated();
+      if (aggregatedRecords()) {
+        input.apply(ParDo.of(new AggregatedWriterFn<T>(this)));
+      } else {
+        input.apply(ParDo.of(new WriterFn<T>(this)));
+      }
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriterFn<T> extends DoFn<T, Void> {
+      private static final int MIN_PARTIAL_RETRIES = 10; // Retries for 
partial success (throttling)
+      private static final ClientPool<
+              KinesisAsyncClientProvider, RetryConfiguration, 
KinesisAsyncClient>
+          clients = ClientPool.autoClosable((p, c) -> 
p.getKinesisAsyncClient(clientOverride(c)));
+
+      private static ClientOverrideConfiguration 
clientOverride(RetryConfiguration retryConfig) {
+        return (retryConfig == null) ? null : retryConfig.toClientOverride();
+      }
+
+      protected final Write<T> spec;
+
+      private transient AsyncPutRecordsHandler handler;
+      private transient List<PutRecordsRequestEntry> requestEntries;
+      private transient int requestBytes;
+
+      WriterFn(Write<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public final void setup() {
+        RetryConfiguration retryConfig = spec.retryConfiguration();
+        FluentBackoff backoff =
+            (retryConfig != null)
+                ? retryConfig
+                    .toThrottlingBackoff()
+                    .withMaxRetries(max(MIN_PARTIAL_RETRIES, 
retryConfig.numRetries()))
+                : FluentBackoff.DEFAULT.withMaxRetries(MIN_PARTIAL_RETRIES);
+        KinesisAsyncClient client = 
clients.retain(spec.kinesisClientProvider(), retryConfig);
+        handler = new AsyncPutRecordsHandler(client, 
spec.concurrentRequests(), backoff);
+      }
+
+      @StartBundle
+      public void startBundle() {
+        requestEntries = new ArrayList<>();
+        requestBytes = 0;
+        handler.reset();
+      }
+
+      @ProcessElement
+      public final void processElement(ProcessContext c) throws Throwable {
+        handler.checkForAsyncFailure();
+        T record = c.element();
+        byte[] data = spec.serializer().apply(record);
+        String pk = spec.partitioner().getPartitionKey(record);
+        String ehk = spec.partitioner().getExplicitHashKey(record);
+
+        validatePartitionKey(pk);
+        if (ehk != null) {
+          validateExplicitHashKey(ehk);
+        }
+        processElement(pk, ehk, data);
+      }
+
+      protected void processElement(
+          String partitionKey, @Nullable String explicitHashKey, byte[] data) 
throws Throwable {
+        addRequestEntry(
+            PutRecordsRequestEntry.builder()
+                .data(SdkBytes.fromByteArrayUnsafe(data))
+                .partitionKey(partitionKey)
+                .explicitHashKey(explicitHashKey)
+                .build(),
+            spec.batchMaxRecords());
+      }
+
+      protected final void addRequestEntry(PutRecordsRequestEntry entry, int 
maxEntries)
+          throws Throwable {
+        int entryBytes =
+            entry.partitionKey().getBytes(UTF_8).length + 
entry.data().asByteArrayUnsafe().length;
+        // check first if new record can still be added to batch, flush 
otherwise
+        if (requestBytes + entryBytes > spec.batchMaxBytes()) {
+          flushEntries();
+        }
+        // the entry might exceed batchMaxBytes, but try anyways
+        requestEntries.add(entry);
+        requestBytes += entryBytes;
+        // flush once batch size limit is reached
+        if (requestEntries.size() >= min(maxEntries, spec.batchMaxRecords())) {
+          flushEntries();
+        }
+      }
+
+      private void flushEntries() throws Throwable {
+        if (!handler.hasErrored() && !requestEntries.isEmpty()) {
+          // Swap lists, luckily no need to synchronize
+          List<PutRecordsRequestEntry> recordsToWrite = requestEntries;
+          requestEntries = new ArrayList<>();
+          requestBytes = 0;
+          handler.putRecords(spec.streamName(), recordsToWrite);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Throwable {
+        flushEntries();
+        handler.waitForCompletion();
+      }
+
+      @Teardown
+      public void teardown() throws Exception {
+        clients.release(spec.kinesisClientProvider(), 
spec.retryConfiguration());
+      }
+
+      private void validatePartitionKey(String pk) {
+        int size = pk != null ? pk.length() : 0;
+        checkArgument(
+            PARTITION_KEY_MIN_LENGTH <= size && size <= 
PARTITION_KEY_MAX_LENGTH,
+            "Invalid partition key of length {}",
+            size);
+      }
+
+      private void validateExplicitHashKey(String hashKey) {
+        BigInteger key = new BigInteger(hashKey);
+        checkArgument(

Review comment:
       nit: checkState()

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -252,6 +371,16 @@ public static Read read() {
         .build();
   }
 
+  public static <T> Write<T> write() {
+    return new AutoValue_KinesisIO_Write.Builder<T>()
+        .batchMaxRecords(MAX_RECORDS_PER_REQUEST)
+        .batchMaxBytes((int) (MAX_BYTES_PER_REQUEST * 0.9)) // allow some 
error margin
+        .concurrentRequests(Write.DEFAULT_CONCURRENCY)
+        .aggregatedRecords(false)

Review comment:
       Should it be enabled by default?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to