This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 25ecc0b [FLINK-24041][connectors] Removed public setter for
elementConverter in Async Sink. Concrete implementations must now construct
this elementConverter
25ecc0b is described below
commit 25ecc0b9202050b340fb4d6fc95b0585a8119937
Author: Zichen Liu <[email protected]>
AuthorDate: Mon Jan 24 15:55:16 2022 +0000
[FLINK-24041][connectors] Removed public setter for elementConverter in
Async Sink. Concrete implementations must now construct this elementConverter
---
.../docs/connectors/datastream/kinesis.md | 174 ++++++++++-----------
docs/content/docs/connectors/datastream/kinesis.md | 54 +++----
.../kinesis/sink/KinesisDataStreamsSink.java | 6 +-
.../sink/KinesisDataStreamsSinkBuilder.java | 28 +++-
.../KinesisDataStreamsSinkElementConverter.java | 19 +--
.../kinesis/sink/PartitionKeyGenerator.java | 31 ++++
.../table/FixedKinesisPartitionKeyGenerator.java | 2 +-
.../kinesis/table/KinesisConnectorOptions.java | 2 +-
.../KinesisDataStreamsConnectorOptionsUtils.java | 4 +-
.../kinesis/table/KinesisDynamicSink.java | 12 +-
.../table/KinesisDynamicTableSinkFactory.java | 5 +-
.../table/KinesisPartitionKeyGeneratorFactory.java | 2 +-
.../table/RandomKinesisPartitionKeyGenerator.java | 2 +-
.../RowDataFieldsKinesisPartitionKeyGenerator.java | 2 +-
.../sink/KinesisDataStreamsSinkBuilderTest.java | 47 ++++--
.../kinesis/sink/KinesisDataStreamsSinkITCase.java | 42 ++---
.../kinesis/sink/examples/SinkIntoKinesis.java | 12 +-
.../flink/connector/base/sink/AsyncSinkBase.java | 2 +-
.../connector/base/sink/AsyncSinkBaseBuilder.java | 16 --
.../connectors/kinesis/KinesisPartitioner.java | 2 +-
20 files changed, 235 insertions(+), 229 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/kinesis.md
b/docs/content.zh/docs/connectors/datastream/kinesis.md
index 60b7881..8ea68fa 100644
--- a/docs/content.zh/docs/connectors/datastream/kinesis.md
+++ b/docs/content.zh/docs/connectors/datastream/kinesis.md
@@ -566,124 +566,116 @@ Retry and backoff parameters can be configured using
the `ConsumerConfigConstant
this is called once per stream during stream consumer deregistration, unless
the `NONE` or `EAGER` registration strategy is configured.
Retry and backoff parameters can be configured using the
`ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.
-## Kinesis Producer
-
-The `FlinkKinesisProducer` uses [Kinesis Producer Library
(KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)
to put data from a Flink stream into a Kinesis stream.
+## Kinesis Data Streams Sink
-Note that the producer is not participating in Flink's checkpointing and
doesn't provide exactly-once processing guarantees. Also, the Kinesis producer
does not guarantee that records are written in order to the shards (See
[here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and
[here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax)
for more details).
+The Kinesis Data Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK
for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
to write data from a Flink stream into a Kinesis stream.
-In case of a failure or a resharding, data will be written again to Kinesis,
leading to duplicates. This behavior is usually called "at-least-once"
semantics.
-
-To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE"
in the AWS dashboard.
+To write data into a Kinesis stream, make sure the stream is marked as
"ACTIVE" in the Amazon Kinesis Data Stream console.
For the monitoring to work, the user accessing the stream needs access to the
CloudWatch service.
{{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}}
{{< tab "Java" >}}
```java
-Properties producerConfig = new Properties();
-// Required configs
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key");
-// Optional configs
-producerConfig.put("AggregationMaxCount", "4294967295");
-producerConfig.put("CollectionMaxCount", "1000");
-producerConfig.put("RecordTtl", "30000");
-producerConfig.put("RequestTimeout", "6000");
-producerConfig.put("ThreadPoolSize", "15");
-
-// Disable Aggregation if it's not supported by a consumer
-// producerConfig.put("AggregationEnabled", "false");
-// Switch KinesisProducer's threading model
-// producerConfig.put("ThreadingModel", "PER_REQUEST");
-
-FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new
SimpleStringSchema(), producerConfig);
-kinesis.setFailOnError(true);
-kinesis.setDefaultStream("kinesis_stream_name");
-kinesis.setDefaultPartition("0");
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key");
+
+KinesisDataStreamsSink<String> kdsSink =
+ KinesisDataStreamsSink.<String>builder()
+ .setKinesisClientProperties(sinkProperties)
// Required
+ .setSerializationSchema(new SimpleStringSchema())
// Required
+ .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode())) // Required
+ .setStreamName("your-stream-name")
// Required
+ .setFailOnError(false)
// Optional
+ .setMaxBatchSize(500)
// Optional
+ .setMaxInFlightRequests(16)
// Optional
+ .setMaxBufferedRequests(10_000)
// Optional
+ .setMaxBatchSizeInBytes(5 * 1024 * 1024)
// Optional
+ .setMaxTimeInBufferMS(5000)
// Optional
+ .setMaxRecordSizeInBytes(1 * 1024 * 1024)
// Optional
+ .build();
DataStream<String> simpleStringStream = ...;
-simpleStringStream.addSink(kinesis);
+simpleStringStream.sinkTo(kdsSink);
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
-val producerConfig = new Properties()
-// Required configs
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key")
-// Optional KPL configs
-producerConfig.put("AggregationMaxCount", "4294967295")
-producerConfig.put("CollectionMaxCount", "1000")
-producerConfig.put("RecordTtl", "30000")
-producerConfig.put("RequestTimeout", "6000")
-producerConfig.put("ThreadPoolSize", "15")
-
-// Disable Aggregation if it's not supported by a consumer
-// producerConfig.put("AggregationEnabled", "false")
-// Switch KinesisProducer's threading model
-// producerConfig.put("ThreadingModel", "PER_REQUEST")
-
-val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema,
producerConfig)
-kinesis.setFailOnError(true)
-kinesis.setDefaultStream("kinesis_stream_name")
-kinesis.setDefaultPartition("0")
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key")
+
+val kdsSink = KinesisDataStreamsSink.<String>builder()
+ .setKinesisClientProperties(sinkProperties)
// Required
+ .setSerializationSchema(new SimpleStringSchema())
// Required
+ .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
// Required
+ .setStreamName("your-stream-name")
// Required
+ .setFailOnError(false)
// Optional
+ .setMaxBatchSize(500)
// Optional
+ .setMaxInFlightRequests(16)
// Optional
+ .setMaxBufferedRequests(10000)
// Optional
+ .setMaxBatchSizeInBytes(5 * 1024 * 1024)
// Optional
+ .setMaxTimeInBufferMS(5000)
// Optional
+ .setMaxRecordSizeInBytes(1 * 1024 * 1024)
// Optional
+ .build()
val simpleStringStream = ...
-simpleStringStream.addSink(kinesis)
+simpleStringStream.sinkTo(kdsSink)
```
{{< /tab >}}
{{< /tabs >}}
-The above is a simple example of using the producer. To initialize
`FlinkKinesisProducer`, users are required to pass in `AWS_REGION`,
`AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties`
instance. Users can also pass in KPL's configurations as optional parameters to
customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL
configs and explanations can be found
[here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kines
[...]
+The above is a simple example of using the Kinesis sink. Begin by creating a
`java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and
`AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the
builder. The default values for the optional configurations are shown above.
Some of these values have been set as a result of [configuration on
KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
-If users don't specify any KPL configs and values, `FlinkKinesisProducer` will
use default config values of KPL, except `RateLimit`. `RateLimit` limits the
maximum allowed put rate for a shard, as a percentage of the backend limits.
KPL's default value is 150 but it makes KPL throw `RateLimitExceededException`
too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer`
overrides KPL's default value to 100.
+You will always need to supply a `KinesisDataStreamsSinkElementConverter`
during sink creation. This is where you specify your serialization schema and
logic for generating a [partition
key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key)
from a record.
-Instead of a `SerializationSchema`, it also supports a
`KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send
the data to multiple streams. This is
-done using the `KinesisSerializationSchema.getTargetStream(T element)` method.
Returning `null` there will instruct the producer to write the element to the
default stream.
-Otherwise, the returned stream name is used.
+Some or all of the records in a request may fail to be persisted by Kinesis
Data Streams for a number of reasons. If `failOnError` is on, then a runtime
exception will be raised. Otherwise those records will be requeued in the
buffer for retry.
-### Threading Model
+The Kinesis Sink provides some metrics through Flink's [metrics system]({{<
ref "docs/ops/metrics" >}}) to analyze the behavior of the connector. A list of
all exposed metrics may be found [here]({{<ref
"docs/ops/metrics#kinesis-sink">}}).
-Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL
from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool
mode uses a queue and thread pool to execute requests to Kinesis. This limits
the number of threads that KPL's native process may create, and therefore
greatly lowers CPU utilization and improves efficiency. **Thus, We highly
recommend Flink users use thread-pool model.** The default thread pool size is
`10`. Users can set the pool siz [...]
+The sink default maximum record size is 1MB and maximum batch size is 5MB in
line with the Kinesis Data Streams maximums. The AWS documentation detailing
these maximums may be found
[here](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
-Users can still switch back to one-thread-per-request mode by setting a
key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`,
as shown in the code commented out in above example.
+### Kinesis Sinks and Fault Tolerance
-### Backpressure
-
-By default, `FlinkKinesisProducer` does not backpressure. Instead, records that
-cannot be sent because of the rate restriction of 1 MB per second per shard are
-buffered in an unbounded queue and dropped when their `RecordTtl` expires.
+The sink is designed to participate in Flink's checkpointing to provide
at-least-once processing guarantees. It does this by completing any in-flight
requests while taking a checkpoint. This effectively assures all requests that
were triggered before the checkpoint have been successfully delivered to
Kinesis Data Streams, before proceeding to process more records.
-To avoid data loss, you can enable backpressuring by restricting the size of
the
-internal queue:
+If Flink needs to restore from a checkpoint (or savepoint), data that has been
written since that checkpoint will be written to Kinesis again, leading to
duplicates in the stream. Moreover, the sink uses the `PutRecords` API call
internally, which does not guarantee to maintain the order of events.
-```
-// 200 Bytes per record, 1 shard
-kinesis.setQueueLimit(500);
-```
+### Backpressure
-The value for `queueLimit` depends on the expected record size. To choose a
good
-value, consider that Kinesis is rate-limited to 1MB per second per shard. If
-less than one second's worth of records is buffered, then the queue may not be
-able to operate at full capacity. With the default `RecordMaxBufferedTime` of
-100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit`
-can then be computed via
+Backpressure in the sink arises as the sink buffer fills up and writes to the
sink
+begins to exhibit blocking behaviour. More information on the rate
restrictions of Kinesis Data Streams may be
+found at [Quotas and
Limits](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
+You generally reduce backpressure by increasing the size of the internal queue:
```
-queue limit = (number of shards * queue size per shard) / record size
+KinesisDataStreamsSink<String> kdsSink =
+ KinesisDataStreamsSink.<String>builder()
+ ...
+ .setMaxBufferedRequests(10_000)
+ ...
```
-E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good
-starting point. If the queue size limits throughput (below 1MB per second per
-shard), try increasing the queue limit slightly.
+## Kinesis Producer
+
+{{< hint warning >}}
+The old Kinesis sink
`org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is
deprecated and may be removed with a future release of Flink, please use
[Kinesis Sink]({{<ref
"docs/connectors/datastream/kinesis#kinesis-data-streams-sink">}}) instead.
+{{< /hint >}}
+The new sink uses the [AWS v2 SDK for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
whereas the old sink uses the Kinesis Producer Library. Because of this, the
new Kinesis sink does not support
[aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation).
## Using Custom Kinesis Endpoints
-It is sometimes desirable to have Flink operate as a consumer or producer
against a Kinesis VPC endpoint or a non-AWS
+It is sometimes desirable to have Flink operate as a source or sink against a
Kinesis VPC endpoint or a non-AWS
Kinesis endpoint such as [Kinesalite](https://github.com/mhart/kinesalite);
this is especially useful when performing
functional testing of a Flink application. The AWS endpoint that would
normally be inferred by the AWS region set in the
Flink configuration must be overridden via a configuration property.
@@ -693,20 +685,20 @@ To override the AWS endpoint, set the
`AWSConfigConstants.AWS_ENDPOINT` and `AWS
{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
{{< tab "Java" >}}
```java
-Properties producerConfig = new Properties();
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key");
-producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+Properties config = new Properties();
+config.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+config.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+config.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+config.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
-val producerConfig = new Properties()
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key")
-producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567")
+val config = new Properties()
+config.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+config.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+config.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+config.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567")
```
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content/docs/connectors/datastream/kinesis.md
b/docs/content/docs/connectors/datastream/kinesis.md
index a5e48f9..45e53a5 100644
--- a/docs/content/docs/connectors/datastream/kinesis.md
+++ b/docs/content/docs/connectors/datastream/kinesis.md
@@ -589,12 +589,6 @@ For the monitoring to work, the user accessing the stream
needs access to the Cl
{{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}}
{{< tab "Java" >}}
```java
-ElementConverter<String, PutRecordsRequestEntry> elementConverter =
- KinesisDataStreamsSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
- .build();
-
Properties sinkProperties = new Properties();
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1");
@@ -605,16 +599,17 @@
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
KinesisDataStreamsSink<String> kdsSink =
KinesisDataStreamsSink.<String>builder()
- .setKinesisClientProperties(sinkProperties) // Required
- .setElementConverter(elementConverter) // Required
- .setStreamName("your-stream-name") // Required
- .setFailOnError(false) // Optional
- .setMaxBatchSize(500) // Optional
- .setMaxInFlightRequests(16) // Optional
- .setMaxBufferedRequests(10_000) // Optional
- .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional
- .setMaxTimeInBufferMS(5000) // Optional
- .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional
+ .setKinesisClientProperties(sinkProperties)
// Required
+ .setSerializationSchema(new SimpleStringSchema())
// Required
+ .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode())) // Required
+ .setStreamName("your-stream-name")
// Required
+ .setFailOnError(false)
// Optional
+ .setMaxBatchSize(500)
// Optional
+ .setMaxInFlightRequests(16)
// Optional
+ .setMaxBufferedRequests(10_000)
// Optional
+ .setMaxBatchSizeInBytes(5 * 1024 * 1024)
// Optional
+ .setMaxTimeInBufferMS(5000)
// Optional
+ .setMaxRecordSizeInBytes(1 * 1024 * 1024)
// Optional
.build();
DataStream<String> simpleStringStream = ...;
@@ -623,12 +618,6 @@ simpleStringStream.sinkTo(kdsSink);
{{< /tab >}}
{{< tab "Scala" >}}
```scala
-val elementConverter =
- KinesisDataStreamsSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
- .build()
-
val sinkProperties = new Properties()
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1")
@@ -638,16 +627,17 @@ sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"aws_access_key_id")
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key")
val kdsSink = KinesisDataStreamsSink.<String>builder()
- .setKinesisClientProperties(sinkProperties) // Required
- .setElementConverter(elementConverter) // Required
- .setStreamName("your-stream-name") // Required
- .setFailOnError(false) // Optional
- .setMaxBatchSize(500) // Optional
- .setMaxInFlightRequests(16) // Optional
- .setMaxBufferedRequests(10000) // Optional
- .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional
- .setMaxTimeInBufferMS(5000) // Optional
- .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional
+ .setKinesisClientProperties(sinkProperties)
// Required
+ .setSerializationSchema(new SimpleStringSchema())
// Required
+ .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
// Required
+ .setStreamName("your-stream-name")
// Required
+ .setFailOnError(false)
// Optional
+ .setMaxBatchSize(500)
// Optional
+ .setMaxInFlightRequests(16)
// Optional
+ .setMaxBufferedRequests(10000)
// Optional
+ .setMaxBatchSizeInBytes(5 * 1024 * 1024)
// Optional
+ .setMaxTimeInBufferMS(5000)
// Optional
+ .setMaxRecordSizeInBytes(1 * 1024 * 1024)
// Optional
.build()
val simpleStringStream = ...
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
index ca1bbf1..50ff4c1 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
@@ -17,7 +17,7 @@
package org.apache.flink.connector.kinesis.sink;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
@@ -111,7 +111,7 @@ public class KinesisDataStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRec
return new KinesisDataStreamsSinkBuilder<>();
}
- @Experimental
+ @Internal
@Override
public SinkWriter<InputT, Void, Collection<PutRecordsRequestEntry>>
createWriter(
InitContext context, List<Collection<PutRecordsRequestEntry>>
states) {
@@ -129,7 +129,7 @@ public class KinesisDataStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRec
kinesisClientProperties);
}
- @Experimental
+ @Internal
@Override
public
Optional<SimpleVersionedSerializer<Collection<PutRecordsRequestEntry>>>
getWriterStateSerializer() {
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java
index 711ea38..ae57fbd 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.kinesis.sink;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
@@ -32,16 +33,12 @@ import java.util.Properties;
* writes String values to a Kinesis Data Streams stream named
your_stream_here.
*
* <pre>{@code
- * ElementConverter<String, PutRecordsRequestEntry> elementConverter =
- * KinesisDataStreamsSinkElementConverter.<String>builder()
- * .setSerializationSchema(new SimpleStringSchema())
- * .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
- * .build();
- *
* KinesisDataStreamsSink<String> kdsSink =
* KinesisDataStreamsSink.<String>builder()
* .setElementConverter(elementConverter)
* .setStreamName("your_stream_name")
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
* .build();
* }</pre>
*
@@ -75,6 +72,8 @@ public class KinesisDataStreamsSinkBuilder<InputT>
private Boolean failOnError;
private String streamName;
private Properties kinesisClientProperties;
+ private SerializationSchema<InputT> serializationSchema;
+ private PartitionKeyGenerator<InputT> partitionKeyGenerator;
KinesisDataStreamsSinkBuilder() {}
@@ -91,6 +90,18 @@ public class KinesisDataStreamsSinkBuilder<InputT>
return this;
}
+ public KinesisDataStreamsSinkBuilder<InputT> setSerializationSchema(
+ SerializationSchema<InputT> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ return this;
+ }
+
+ public KinesisDataStreamsSinkBuilder<InputT> setPartitionKeyGenerator(
+ PartitionKeyGenerator<InputT> partitionKeyGenerator) {
+ this.partitionKeyGenerator = partitionKeyGenerator;
+ return this;
+ }
+
public KinesisDataStreamsSinkBuilder<InputT> setFailOnError(boolean
failOnError) {
this.failOnError = failOnError;
return this;
@@ -105,7 +116,10 @@ public class KinesisDataStreamsSinkBuilder<InputT>
@Override
public KinesisDataStreamsSink<InputT> build() {
return new KinesisDataStreamsSink<>(
- getElementConverter(),
+ new KinesisDataStreamsSinkElementConverter.Builder<InputT>()
+ .setSerializationSchema(serializationSchema)
+ .setPartitionKeyGenerator(partitionKeyGenerator)
+ .build(),
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
Optional.ofNullable(getMaxInFlightRequests())
.orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
index 6b6a78f..2080f2e 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
@@ -17,8 +17,7 @@
package org.apache.flink.connector.kinesis.sink;
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -27,15 +26,12 @@ import org.apache.flink.util.Preconditions;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
-import java.io.Serializable;
-import java.util.function.Function;
-
/**
* An implementation of the {@link ElementConverter} that uses the AWS Kinesis
SDK v2. The user only
* needs to provide a {@link SerializationSchema} of the {@code InputT} and a
{@link
* PartitionKeyGenerator} lambda to transform the input element into a String.
*/
-@PublicEvolving
+@Internal
public class KinesisDataStreamsSinkElementConverter<InputT>
implements ElementConverter<InputT, PutRecordsRequestEntry> {
@@ -54,7 +50,6 @@ public class KinesisDataStreamsSinkElementConverter<InputT>
this.partitionKeyGenerator = partitionKeyGenerator;
}
- @Experimental
@Override
public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context
context) {
return PutRecordsRequestEntry.builder()
@@ -63,20 +58,11 @@ public class KinesisDataStreamsSinkElementConverter<InputT>
.build();
}
- /**
- * This is a serializable function whose {@code accept()} method specifies
how to convert from
- * an input element to the partition key, a string.
- */
- @PublicEvolving
- @FunctionalInterface
- public interface PartitionKeyGenerator<InputT> extends Function<InputT,
String>, Serializable {}
-
public static <InputT> Builder<InputT> builder() {
return new Builder<>();
}
/** A builder for the KinesisDataStreamsSinkElementConverter. */
- @PublicEvolving
public static class Builder<InputT> {
private SerializationSchema<InputT> serializationSchema;
@@ -94,7 +80,6 @@ public class KinesisDataStreamsSinkElementConverter<InputT>
return this;
}
- @Experimental
public KinesisDataStreamsSinkElementConverter<InputT> build() {
Preconditions.checkNotNull(
serializationSchema,
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java
new file mode 100644
index 0000000..2873c05
--- /dev/null
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * This is a serializable function whose {@code accept()} method specifies how
to convert from an
+ * input element to the partition key, a string.
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface PartitionKeyGenerator<InputT> extends Function<InputT,
String>, Serializable {}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java
index 8a7c0e5..21e4595 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java
@@ -19,7 +19,7 @@
package org.apache.flink.connector.kinesis.table;
import org.apache.flink.annotation.PublicEvolving;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator;
+import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import org.apache.flink.util.Preconditions;
import java.util.Objects;
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
index 7a55a73..e4f1037 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator;
+import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDataStreamsConnectorOptionsUtils.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDataStreamsConnectorOptionsUtils.java
index a1a0782..d155e89 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDataStreamsConnectorOptionsUtils.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDataStreamsConnectorOptionsUtils.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.aws.table.util.AWSOptionUtils;
import
org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter;
+import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import
org.apache.flink.connector.kinesis.table.util.KinesisAsyncClientOptionsUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
@@ -65,7 +65,7 @@ public class KinesisDataStreamsConnectorOptionsUtils {
private final AsyncSinkConfigurationValidator
asyncSinkconfigurationValidator;
private final Map<String, String> resolvedOptions;
private final ReadableConfig tableOptions;
- private final
KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator<RowData>
partitioner;
+ private final PartitionKeyGenerator<RowData> partitioner;
/**
* Prefixes of properties that are validated by downstream components and
should not be
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
index 705b7cd..8bcf656 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
@@ -20,13 +20,11 @@ package org.apache.flink.connector.kinesis.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink;
import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkBuilder;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator;
+import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -106,15 +104,11 @@ public class KinesisDynamicSink extends
AsyncDynamicTableSink<PutRecordsRequestE
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
SerializationSchema<RowData> serializationSchema =
encodingFormat.createRuntimeEncoder(context, consumedDataType);
- ElementConverter<RowData, PutRecordsRequestEntry> elementConverter =
- KinesisDataStreamsSinkElementConverter.<RowData>builder()
- .setSerializationSchema(serializationSchema)
- .setPartitionKeyGenerator(partitioner)
- .build();
KinesisDataStreamsSinkBuilder<RowData> builder =
KinesisDataStreamsSink.<RowData>builder()
- .setElementConverter(elementConverter)
+ .setSerializationSchema(serializationSchema)
+ .setPartitionKeyGenerator(partitioner)
.setKinesisClientProperties(kinesisClientProperties)
.setStreamName(stream);
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
index 6d1ac6e..30056c6 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
@@ -23,7 +23,7 @@ import
org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter;
+import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -89,8 +89,7 @@ public class KinesisDynamicTableSinkFactory extends
AsyncDynamicTableSinkFactory
.setEncodingFormat(encodingFormat)
.setConsumedDataType(physicalDataType)
.setPartitioner(
-
(KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator<RowData>)
- properties.get(SINK_PARTITIONER.key()));
+ (PartitionKeyGenerator<RowData>)
properties.get(SINK_PARTITIONER.key()));
addAsyncOptionsToBuilder(properties, builder);
Optional.ofNullable((Boolean) properties.get(SINK_FAIL_ON_ERROR.key()))
.ifPresent(builder::setFailOnError);
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java
index 7f7168a..552c50b 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.kinesis.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator;
+import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java
index 4f9ca23..280cfd0 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java
@@ -19,7 +19,7 @@
package org.apache.flink.connector.kinesis.table;
import org.apache.flink.annotation.PublicEvolving;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator;
+import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import java.util.UUID;
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java
index df659f5..52eedfb 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java
@@ -19,7 +19,7 @@
package org.apache.flink.connector.kinesis.table;
import org.apache.flink.annotation.Internal;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator;
+import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.RowType.RowField;
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java
index a796d92..c886fd7 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java
@@ -17,28 +17,25 @@
package org.apache.flink.connector.kinesis.sink;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.assertj.core.api.Assertions;
import org.junit.Test;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
/** Covers construction, defaults and sanity checking of
KinesisDataStreamsSinkBuilder. */
public class KinesisDataStreamsSinkBuilderTest {
- private static final ElementConverter<String, PutRecordsRequestEntry>
- ELEMENT_CONVERTER_PLACEHOLDER =
- KinesisDataStreamsSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
- .build();
+ private static final SerializationSchema<String> SERIALIZATION_SCHEMA =
+ new SimpleStringSchema();
+ private static final PartitionKeyGenerator<String> PARTITION_KEY_GENERATOR
=
+ element -> String.valueOf(element.hashCode());
@Test
public void elementConverterOfSinkMustBeSetWhenBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(() ->
KinesisDataStreamsSink.builder().setStreamName("stream").build())
.withMessageContaining(
- "ElementConverter must be not null when initilizing
the AsyncSinkBase.");
+ "No SerializationSchema was supplied to the
KinesisDataStreamsSinkElementConverter builder.");
}
@Test
@@ -47,7 +44,8 @@ public class KinesisDataStreamsSinkBuilderTest {
.isThrownBy(
() ->
KinesisDataStreamsSink.<String>builder()
-
.setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
+
.setSerializationSchema(SERIALIZATION_SCHEMA)
.build())
.withMessageContaining(
"The stream name must not be null when initializing
the KDS Sink.");
@@ -60,9 +58,36 @@ public class KinesisDataStreamsSinkBuilderTest {
() ->
KinesisDataStreamsSink.<String>builder()
.setStreamName("")
-
.setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
+
.setSerializationSchema(SERIALIZATION_SCHEMA)
.build())
.withMessageContaining(
"The stream name must be set when initializing the KDS
Sink.");
}
+
+ @Test
+ public void serializationSchemaMustBeSetWhenSinkIsBuilt() {
+ Assertions.assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ KinesisDataStreamsSink.<String>builder()
+ .setStreamName("stream")
+
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
+ .build())
+ .withMessageContaining(
+ "No SerializationSchema was supplied to the
KinesisDataStreamsSinkElementConverter builder.");
+ }
+
+ @Test
+ public void partitionKeyGeneratorMustBeSetWhenSinkIsBuilt() {
+ Assertions.assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ KinesisDataStreamsSink.<String>builder()
+ .setStreamName("stream")
+
.setSerializationSchema(SERIALIZATION_SCHEMA)
+ .build())
+ .withMessageContaining(
+ "No PartitionKeyGenerator lambda was supplied to the
KinesisDataStreamsSinkElementConverter builder.");
+ }
}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java
index 06e3418..301f228 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java
@@ -17,9 +17,9 @@
package org.apache.flink.connector.kinesis.sink;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -44,7 +44,6 @@ import
software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
@@ -64,18 +63,10 @@ public class KinesisDataStreamsSinkITCase extends
TestLogger {
private static final String DEFAULT_FIRST_SHARD_NAME =
"shardId-000000000000";
- private final ElementConverter<String, PutRecordsRequestEntry>
elementConverter =
- KinesisDataStreamsSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
- .build();
-
- private final ElementConverter<String, PutRecordsRequestEntry>
- partitionKeyTooLongElementConverter =
- KinesisDataStreamsSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .setPartitionKeyGenerator(element -> element)
- .build();
+ private final SerializationSchema<String> serializationSchema = new
SimpleStringSchema();
+ private final PartitionKeyGenerator<String> partitionKeyGenerator =
+ element -> String.valueOf(element.hashCode());
+ private final PartitionKeyGenerator<String> longPartitionKeyGenerator =
element -> element;
@ClassRule
public static final KinesaliteContainer KINESALITE =
@@ -174,7 +165,8 @@ public class KinesisDataStreamsSinkITCase extends
TestLogger {
.withExpectedElements(5)
.withKinesaliteStreamName("test-stream-name-7")
.withSinkConnectionStreamName("test-stream-name-7")
-
.withElementConverter(partitionKeyTooLongElementConverter)
+
.withSerializationSchema(serializationSchema)
+
.withPartitionKeyGenerator(longPartitionKeyGenerator)
.runScenario())
.havingCause()
.havingCause()
@@ -192,8 +184,10 @@ public class KinesisDataStreamsSinkITCase extends
TestLogger {
private boolean failOnError = false;
private String kinesaliteStreamName;
private String sinkConnectionStreamName;
- private ElementConverter<String, PutRecordsRequestEntry>
elementConverter =
- KinesisDataStreamsSinkITCase.this.elementConverter;
+ private SerializationSchema<String> serializationSchema =
+ KinesisDataStreamsSinkITCase.this.serializationSchema;
+ private PartitionKeyGenerator<String> partitionKeyGenerator =
+ KinesisDataStreamsSinkITCase.this.partitionKeyGenerator;
public void runScenario() throws Exception {
prepareStream(kinesaliteStreamName);
@@ -216,7 +210,8 @@ public class KinesisDataStreamsSinkITCase extends
TestLogger {
KinesisDataStreamsSink<String> kdsSink =
KinesisDataStreamsSink.<String>builder()
- .setElementConverter(elementConverter)
+ .setSerializationSchema(serializationSchema)
+ .setPartitionKeyGenerator(partitionKeyGenerator)
.setMaxTimeInBufferMS(bufferMaxTimeMS)
.setMaxInFlightRequests(maxInflightReqs)
.setMaxBatchSize(maxBatchSize)
@@ -299,9 +294,14 @@ public class KinesisDataStreamsSinkITCase extends
TestLogger {
return this;
}
- public Scenario withElementConverter(
- ElementConverter<String, PutRecordsRequestEntry>
elementConverter) {
- this.elementConverter = elementConverter;
+ public Scenario withSerializationSchema(SerializationSchema<String>
serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ return this;
+ }
+
+ public Scenario withPartitionKeyGenerator(
+ PartitionKeyGenerator<String> partitionKeyGenerator) {
+ this.partitionKeyGenerator = partitionKeyGenerator;
return this;
}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
index 72edf40..e17afa2 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
@@ -19,16 +19,13 @@ package org.apache.flink.connector.kinesis.sink.examples;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.utils.ImmutableMap;
import java.util.Properties;
@@ -43,12 +40,6 @@ import java.util.Properties;
*/
public class SinkIntoKinesis {
- private static final ElementConverter<String, PutRecordsRequestEntry>
elementConverter =
- KinesisDataStreamsSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
- .build();
-
public static void main(String[] args) throws Exception {
ObjectMapper mapper = new ObjectMapper();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -65,7 +56,8 @@ public class SinkIntoKinesis {
KinesisDataStreamsSink<String> kdsSink =
KinesisDataStreamsSink.<String>builder()
- .setElementConverter(elementConverter)
+ .setSerializationSchema(new SimpleStringSchema())
+ .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
.setStreamName("your-stream-name")
.setMaxBatchSize(20)
.setKinesisClientProperties(sinkProperties)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
index 73abd02..8048799 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
@@ -70,7 +70,7 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT
extends Serializable>
this.elementConverter =
Preconditions.checkNotNull(
elementConverter,
- "ElementConverter must be not null when initilizing
the AsyncSinkBase.");
+ "ElementConverter must be not null when initializing
the AsyncSinkBase.");
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBaseBuilder.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBaseBuilder.java
index ada0bd3..743442f 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBaseBuilder.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBaseBuilder.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.base.sink;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
import java.io.Serializable;
@@ -36,7 +35,6 @@ public abstract class AsyncSinkBaseBuilder<
RequestEntryT extends Serializable,
ConcreteBuilderT extends AsyncSinkBaseBuilder<?, ?, ?>> {
- private ElementConverter<InputT, RequestEntryT> elementConverter;
private Integer maxBatchSize;
private Integer maxInFlightRequests;
private Integer maxBufferedRequests;
@@ -45,16 +43,6 @@ public abstract class AsyncSinkBaseBuilder<
private Long maxRecordSizeInBytes;
/**
- * @param elementConverter the {@link ElementConverter} to be used for the
sink
- * @return {@link ConcreteBuilderT} itself
- */
- public ConcreteBuilderT setElementConverter(
- ElementConverter<InputT, RequestEntryT> elementConverter) {
- this.elementConverter = elementConverter;
- return (ConcreteBuilderT) this;
- }
-
- /**
* @param maxBatchSize maximum number of elements that may be passed in a
list to be written
* downstream.
* @return {@link ConcreteBuilderT} itself
@@ -127,10 +115,6 @@ public abstract class AsyncSinkBaseBuilder<
/** Builds the Sink with the settings applied to this builder. */
public abstract AsyncSinkBase<InputT, RequestEntryT> build();
- protected ElementConverter<InputT, RequestEntryT> getElementConverter() {
- return elementConverter;
- }
-
protected Integer getMaxBatchSize() {
return maxBatchSize;
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
index fe8b275..c93c9da 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis;
import org.apache.flink.annotation.PublicEvolving;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator;
+import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import java.io.Serializable;