This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3f06f97 [FLINK-26373][connector/kinesis] Rename KinesisDataStreams
module and classes into KinesisStreams
3f06f97 is described below
commit 3f06f97505ff57c9f21ae6c0b8544cc7432f2825
Author: Ahmed Hamdy <[email protected]>
AuthorDate: Fri Feb 25 15:30:32 2022 +0000
[FLINK-26373][connector/kinesis] Rename KinesisDataStreams module and
classes into KinesisStreams
---
.../docs/connectors/datastream/kinesis.md | 18 ++++++------
docs/content.zh/docs/connectors/table/kinesis.md | 8 +++---
docs/content/docs/connectors/datastream/kinesis.md | 20 +++++++-------
docs/content/docs/connectors/table/kinesis.md | 8 +++---
.../flink-architecture-tests-production/pom.xml | 2 +-
flink-architecture-tests/pom.xml | 2 +-
.../flink-connector-aws-kinesis-firehose/pom.xml | 2 +-
.../firehose/table/KinesisFirehoseDynamicSink.java | 15 +++++-----
.../table/KinesisFirehoseDynamicTableFactory.java | 4 +--
.../KinesisFirehoseDynamicTableFactoryTest.java | 7 ++---
.../75596a92-3816-4a44-85ac-7c96e72f443a} | 0
.../7e2560a3-23eb-40cc-8669-e7943e393b88 | 0
.../84abeb9c-8355-4165-96aa-dda65b04e5e7 | 4 +--
.../archunit-violations/stored.rules | 0
.../pom.xml | 4 +--
.../sink/KinesisStreamsConfigConstants.java} | 4 +--
.../kinesis/sink/KinesisStreamsException.java} | 16 +++++------
.../kinesis/sink/KinesisStreamsSink.java} | 22 +++++++--------
.../kinesis/sink/KinesisStreamsSinkBuilder.java} | 32 +++++++++++-----------
.../sink/KinesisStreamsSinkElementConverter.java} | 14 +++++-----
.../kinesis/sink/KinesisStreamsSinkWriter.java} | 27 ++++++++----------
.../sink/KinesisStreamsStateSerializer.java} | 4 +--
.../kinesis/sink/PartitionKeyGenerator.java | 0
.../table/FixedKinesisPartitionKeyGenerator.java | 0
.../kinesis/table/KinesisConnectorOptions.java | 0
.../kinesis/table/KinesisDynamicSink.java | 10 +++----
.../table/KinesisDynamicTableSinkFactory.java | 12 ++++----
.../table/KinesisPartitionKeyGeneratorFactory.java | 0
.../table/RandomKinesisPartitionKeyGenerator.java | 0
.../RowDataFieldsKinesisPartitionKeyGenerator.java | 0
.../util/KinesisStreamsConnectorOptionsUtils.java} | 4 +--
.../org.apache.flink.table.factories.Factory | 0
.../src/main/resources/log4j2.properties | 0
.../architecture/TestCodeArchitectureTest.java | 0
.../sink/KinesisStreamsSinkBuilderTest.java} | 20 +++++++-------
.../kinesis/sink/KinesisStreamsSinkITCase.java} | 12 ++++----
.../sink/KinesisStreamsStateSerializerTest.java} | 8 +++---
.../kinesis/sink/examples/SinkIntoKinesis.java | 9 +++---
.../table/KinesisDynamicTableSinkFactoryTest.java | 10 +++----
...DataFieldsKinesisPartitionKeyGeneratorTest.java | 0
.../util/KinesisProducerOptionsMapperTest.java | 4 +--
.../kinesis/testutils/KinesaliteContainer.java | 0
.../src/test/resources/archunit.properties | 0
.../src/test/resources/log4j2-test.properties | 0
.../src/test/resources/profile | 0
flink-connectors/flink-connector-kinesis/pom.xml | 8 +++---
.../connectors/kinesis/FlinkKinesisProducer.java | 8 +++---
.../kinesis/proxy/KinesisProxyV2Factory.java | 10 +++----
.../streaming/connectors/kinesis/util/AWSUtil.java | 4 +--
.../pom.xml | 2 +-
.../pom.xml | 10 +++----
.../src/main/resources/META-INF/NOTICE | 2 +-
flink-connectors/pom.xml | 4 +--
.../pom.xml | 6 ++--
.../table/test/KinesisStreamsTableApiIT.java} | 9 +++---
.../src/test/resources/log4j2-test.properties | 0
.../src/test/resources/send-orders.sql | 0
.../flink-glue-schema-registry-json-test/pom.xml | 2 +-
.../flink-streaming-kinesis-test/pom.xml | 2 +-
flink-end-to-end-tests/pom.xml | 2 +-
pom.xml | 2 +-
tools/ci/stage.sh | 2 +-
62 files changed, 184 insertions(+), 191 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/kinesis.md
b/docs/content.zh/docs/connectors/datastream/kinesis.md
index 500db81..cc0661e 100644
--- a/docs/content.zh/docs/connectors/datastream/kinesis.md
+++ b/docs/content.zh/docs/connectors/datastream/kinesis.md
@@ -566,9 +566,9 @@ Retry and backoff parameters can be configured using the
`ConsumerConfigConstant
this is called once per stream during stream consumer deregistration, unless
the `NONE` or `EAGER` registration strategy is configured.
Retry and backoff parameters can be configured using the
`ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.
-## Kinesis Data Streams Sink
+## Kinesis Streams Sink
-The Kinesis Data Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK
for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
to write data from a Flink stream into a Kinesis stream.
+The Kinesis Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
to write data from a Flink stream into a Kinesis stream.
To write data into a Kinesis stream, make sure the stream is marked as
"ACTIVE" in the Amazon Kinesis Data Stream console.
@@ -585,8 +585,8 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION,
"us-east-1");
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key");
-KinesisDataStreamsSink<String> kdsSink =
- KinesisDataStreamsSink.<String>builder()
+KinesisStreamsSink<String> kdsSink =
+ KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties)
// Required
.setSerializationSchema(new SimpleStringSchema())
// Required
.setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode())) // Required
@@ -614,7 +614,7 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION,
"us-east-1")
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key")
-val kdsSink = KinesisDataStreamsSink.<String>builder()
+val kdsSink = KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties)
// Required
.setSerializationSchema(new SimpleStringSchema())
// Required
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
// Required
@@ -636,7 +636,7 @@ simpleStringStream.sinkTo(kdsSink)
The above is a simple example of using the Kinesis sink. Begin by creating a
`java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and
`AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the
builder. The default values for the optional configurations are shown above.
Some of these values have been set as a result of [configuration on
KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
-You will always need to supply a `KinesisDataStreamsSinkElementConverter`
during sink creation. This is where you specify your serialization schema and
logic for generating a [partition
key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key)
from a record.
+You will always need to specify your serialization schema and logic for
generating a [partition
key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key)
from a record.
Some or all of the records in a request may fail to be persisted by Kinesis
Data Streams for a number of reasons. If `failOnError` is on, then a runtime
exception will be raised. Otherwise those records will be requeued in the
buffer for retry.
@@ -658,8 +658,8 @@ found at [Quotas and
Limits](https://docs.aws.amazon.com/streams/latest/dev/serv
You generally reduce backpressure by increasing the size of the internal queue:
```
-KinesisDataStreamsSink<String> kdsSink =
- KinesisDataStreamsSink.<String>builder()
+KinesisStreamsSink<String> kdsSink =
+ KinesisStreamsSink.<String>builder()
...
.setMaxBufferedRequests(10_000)
...
@@ -668,7 +668,7 @@ KinesisDataStreamsSink<String> kdsSink =
## Kinesis Producer
{{< hint warning >}}
-The old Kinesis sink
`org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is
deprecated and may be removed with a future release of Flink, please use
[Kinesis Sink]({{<ref
"docs/connectors/datastream/kinesis#kinesis-data-streams-sink">}}) instead.
+The old Kinesis sink
`org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is
deprecated and may be removed with a future release of Flink, please use
[Kinesis Sink]({{<ref
"docs/connectors/datastream/kinesis#kinesis-streams-sink">}}) instead.
{{< /hint >}}
The new sink uses the [AWS v2 SDK for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
whereas the old sink uses the Kinesis Producer Library. Because of this, the
new Kinesis sink does not support
[aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation).
diff --git a/docs/content.zh/docs/connectors/table/kinesis.md
b/docs/content.zh/docs/connectors/table/kinesis.md
index b216acd..93d7657 100644
--- a/docs/content.zh/docs/connectors/table/kinesis.md
+++ b/docs/content.zh/docs/connectors/table/kinesis.md
@@ -637,7 +637,7 @@ Connector Options
<td></td>
<td>
Deprecated options previously used by the legacy connector.
- Options with equivalant alternatives in
<code>KinesisDataStreamsSink</code> are matched
+ Options with equivalant alternatives in
<code>KinesisStreamsSink</code> are matched
to their respective properties. Unsupported options are logged out to
user as warnings.
</td>
</tr>
@@ -809,11 +809,11 @@ Please refer to the [Formats]({{< ref
"docs/connectors/table/formats/overview" >
# Updates in 1.15
-Kinesis table API connector sink data stream depends on
<code>FlinkKinesisProducer</code> till 1.14, with the introduction of
<code>KinesisDataStreamsSink</code> in 1.15 kinesis table API sink connector
has been migrated to the new <code>KinesisDataStreamsSink</code>.
Authentication options have been migrated identically while sink configuration
options are now compatible with <code>KinesisDataStreamsSink</code>.
+Kinesis table API connector sink data stream depends on
<code>FlinkKinesisProducer</code> till 1.14, with the introduction of
<code>KinesisStreamsSink</code> in 1.15 kinesis table API sink connector has
been migrated to the new <code>KinesisStreamsSink</code>. Authentication
options have been migrated identically while sink configuration options are now
compatible with <code>KinesisStreamsSink</code>.
-Options configuring <code>FlinkKinesisProducer</code> are now deprecated with
fallback support for common configuration options with
<code>KinesisDataStreamsSink</code>.
+Options configuring <code>FlinkKinesisProducer</code> are now deprecated with
fallback support for common configuration options with
<code>KinesisStreamsSink</code>.
-<code>KinesisDataStreamsSink</code> uses <code>KinesisAsyncClient</code> to
send records to kinesis,
+<code>KinesisStreamsSink</code> uses <code>KinesisAsyncClient</code> to send
records to kinesis,
which doesn't support aggregation. In consequence, table options configuring
aggregation in the deprecated <code>FlinkKinesisProducer</code>
are now deprecated and will be ignored, this includes
<code>sink.producer.aggregation-enabled</code> and
<code>sink.producer.aggregation-count</code>.
diff --git a/docs/content/docs/connectors/datastream/kinesis.md
b/docs/content/docs/connectors/datastream/kinesis.md
index f91821b..fbc515d 100644
--- a/docs/content/docs/connectors/datastream/kinesis.md
+++ b/docs/content/docs/connectors/datastream/kinesis.md
@@ -45,7 +45,7 @@ To use this connector, add one or more of the following
dependencies to your pro
</tr>
<tr>
<td>Sink</td>
- <td>{{< artifact flink-connector-aws-kinesis-data-streams >}}</td>
+ <td>{{< artifact flink-connector-aws-kinesis-streams >}}</td>
</tr>
</tbody>
</table>
@@ -578,9 +578,9 @@ Retry and backoff parameters can be configured using the
`ConsumerConfigConstant
this is called once per stream during stream consumer deregistration, unless
the `NONE` or `EAGER` registration strategy is configured.
Retry and backoff parameters can be configured using the
`ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.
-## Kinesis Data Streams Sink
+## Kinesis Streams Sink
-The Kinesis Data Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK
for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
to write data from a Flink stream into a Kinesis stream.
+The Kinesis Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
to write data from a Flink stream into a Kinesis stream.
To write data into a Kinesis stream, make sure the stream is marked as
"ACTIVE" in the Amazon Kinesis Data Stream console.
@@ -597,8 +597,8 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION,
"us-east-1");
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key");
-KinesisDataStreamsSink<String> kdsSink =
- KinesisDataStreamsSink.<String>builder()
+KinesisStreamsSink<String> kdsSink =
+ KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties)
// Required
.setSerializationSchema(new SimpleStringSchema())
// Required
.setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode())) // Required
@@ -626,7 +626,7 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION,
"us-east-1")
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"aws_secret_access_key")
-val kdsSink = KinesisDataStreamsSink.<String>builder()
+val kdsSink = KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties)
// Required
.setSerializationSchema(new SimpleStringSchema())
// Required
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
// Required
@@ -648,7 +648,7 @@ simpleStringStream.sinkTo(kdsSink)
The above is a simple example of using the Kinesis sink. Begin by creating a
`java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and
`AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the
builder. The default values for the optional configurations are shown above.
Some of these values have been set as a result of [configuration on
KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
-You will always need to supply a `KinesisDataStreamsSinkElementConverter`
during sink creation. This is where you specify your serialization schema and
logic for generating a [partition
key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key)
from a record.
+You will always need to specify your serialization schema and logic for
generating a [partition
key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key)
from a record.
Some or all of the records in a request may fail to be persisted by Kinesis
Data Streams for a number of reasons. If `failOnError` is on, then a runtime
exception will be raised. Otherwise those records will be requeued in the
buffer for retry.
@@ -670,8 +670,8 @@ found at [Quotas and
Limits](https://docs.aws.amazon.com/streams/latest/dev/serv
You generally reduce backpressure by increasing the size of the internal queue:
```
-KinesisDataStreamsSink<String> kdsSink =
- KinesisDataStreamsSink.<String>builder()
+KinesisStreamsSink<String> kdsSink =
+ KinesisStreamsSink.<String>builder()
...
.setMaxBufferedRequests(10_000)
...
@@ -680,7 +680,7 @@ KinesisDataStreamsSink<String> kdsSink =
## Kinesis Producer
{{< hint warning >}}
-The old Kinesis sink
`org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is
deprecated and may be removed with a future release of Flink, please use
[Kinesis Sink]({{<ref
"docs/connectors/datastream/kinesis#kinesis-data-streams-sink">}}) instead.
+The old Kinesis sink
`org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is
deprecated and may be removed with a future release of Flink, please use
[Kinesis Sink]({{<ref
"docs/connectors/datastream/kinesis#kinesis-streams-sink">}}) instead.
{{< /hint >}}
The new sink uses the [AWS v2 SDK for
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
whereas the old sink uses the Kinesis Producer Library. Because of this, the
new Kinesis sink does not support
[aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation).
diff --git a/docs/content/docs/connectors/table/kinesis.md
b/docs/content/docs/connectors/table/kinesis.md
index cce4caa..6a69b89 100644
--- a/docs/content/docs/connectors/table/kinesis.md
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -709,7 +709,7 @@ Connector Options
<td></td>
<td>
Deprecated options previously used by the legacy connector.
- Options with equivalant alternatives in
<code>KinesisDataStreamsSink</code> are matched
+ Options with equivalant alternatives in
<code>KinesisStreamsSink</code> are matched
to their respective properties. Unsupported options are logged out to
user as warnings.
</td>
</tr>
@@ -881,11 +881,11 @@ Please refer to the [Formats]({{< ref
"docs/connectors/table/formats/overview" >
# Updates in 1.15
-Kinesis table API connector sink data stream depends on
<code>FlinkKinesisProducer</code> till 1.14, with the introduction of
<code>KinesisDataStreamsSink</code> in 1.15 kinesis table API sink connector
has been migrated to the new <code>KinesisDataStreamsSink</code>.
Authentication options have been migrated identically while sink configuration
options are now compatible with <code>KinesisDataStreamsSink</code>.
+Kinesis table API connector sink data stream depends on
<code>FlinkKinesisProducer</code> till 1.14, with the introduction of
<code>KinesisStreamsSink</code> in 1.15 kinesis table API sink connector has
been migrated to the new <code>KinesisStreamsSink</code>. Authentication
options have been migrated identically while sink configuration options are now
compatible with <code>KinesisStreamsSink</code>.
-Options configuring <code>FlinkKinesisProducer</code> are now deprecated with
fallback support for common configuration options with
<code>KinesisDataStreamsSink</code>.
+Options configuring <code>FlinkKinesisProducer</code> are now deprecated with
fallback support for common configuration options with
<code>KinesisStreamsSink</code>.
-<code>KinesisDataStreamsSink</code> uses <code>KinesisAsyncClient</code> to
send records to kinesis,
+<code>KinesisStreamsSink</code> uses <code>KinesisAsyncClient</code> to send
records to kinesis,
which doesn't support aggregation. In consequence, table options configuring
aggregation in the deprecated <code>FlinkKinesisProducer</code>
are now deprecated and will be ignored, this includes
<code>sink.producer.aggregation-enabled</code> and
<code>sink.producer.aggregation-count</code>.
diff --git
a/flink-architecture-tests/flink-architecture-tests-production/pom.xml
b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
index 751a056..4752e9b 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/pom.xml
+++ b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
@@ -174,7 +174,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
</dependency>
<dependency>
diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml
index d154a50..788b158 100644
--- a/flink-architecture-tests/pom.xml
+++ b/flink-architecture-tests/pom.xml
@@ -219,7 +219,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
index bf0a25a..7463999 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
@@ -31,7 +31,7 @@ under the License.
</parent>
<artifactId>flink-connector-aws-kinesis-firehose</artifactId>
- <name>Flink : Connectors : AWS Kinesis Data Firehose</name>
+ <name>Flink : Connectors : Amazon Kinesis Data Firehose</name>
<properties>
<aws.sdk.version>2.17.52</aws.sdk.version>
</properties>
diff --git
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
index 5b6358b..149dbaf 100644
---
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
+++
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
@@ -129,8 +129,8 @@ public class KinesisFirehoseDynamicSink extends
AsyncDynamicTableSink<Record> {
/** Builder class for {@link KinesisFirehoseDynamicSink}. */
@Internal
- public static class KinesisDataFirehoseDynamicSinkBuilder
- extends AsyncDynamicTableSinkBuilder<Record,
KinesisDataFirehoseDynamicSinkBuilder> {
+ public static class KinesisFirehoseDynamicSinkBuilder
+ extends AsyncDynamicTableSinkBuilder<Record,
KinesisFirehoseDynamicSinkBuilder> {
private DataType consumedDataType = null;
private String deliveryStream = null;
@@ -138,30 +138,29 @@ public class KinesisFirehoseDynamicSink extends
AsyncDynamicTableSink<Record> {
private EncodingFormat<SerializationSchema<RowData>> encodingFormat =
null;
private Boolean failOnError = null;
- public KinesisDataFirehoseDynamicSinkBuilder setConsumedDataType(
- DataType consumedDataType) {
+ public KinesisFirehoseDynamicSinkBuilder setConsumedDataType(DataType
consumedDataType) {
this.consumedDataType = consumedDataType;
return this;
}
- public KinesisDataFirehoseDynamicSinkBuilder setDeliveryStream(String
deliveryStream) {
+ public KinesisFirehoseDynamicSinkBuilder setDeliveryStream(String
deliveryStream) {
this.deliveryStream = deliveryStream;
return this;
}
- public KinesisDataFirehoseDynamicSinkBuilder
setFirehoseClientProperties(
+ public KinesisFirehoseDynamicSinkBuilder setFirehoseClientProperties(
Properties firehoseClientProperties) {
this.firehoseClientProperties = firehoseClientProperties;
return this;
}
- public KinesisDataFirehoseDynamicSinkBuilder setEncodingFormat(
+ public KinesisFirehoseDynamicSinkBuilder setEncodingFormat(
EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
this.encodingFormat = encodingFormat;
return this;
}
- public KinesisDataFirehoseDynamicSinkBuilder setFailOnError(Boolean
failOnError) {
+ public KinesisFirehoseDynamicSinkBuilder setFailOnError(Boolean
failOnError) {
this.failOnError = failOnError;
return this;
}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java
index f9a8e91..a7ca38e 100644
---
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java
+++
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java
@@ -45,8 +45,8 @@ public class KinesisFirehoseDynamicTableFactory extends
AsyncDynamicTableSinkFac
AsyncDynamicSinkContext factoryContext = new
AsyncDynamicSinkContext(this, context);
- KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder
builder =
- new
KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder();
+ KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder builder =
+ new
KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder();
KinesisFirehoseConnectorOptionUtils optionsUtils =
new KinesisFirehoseConnectorOptionUtils(
diff --git
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
index 8c45883..ac5292e 100644
---
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
+++
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
@@ -57,7 +57,7 @@ public class KinesisFirehoseDynamicTableFactoryTest extends
TestLogger {
// Construct expected DynamicTableSink using factory under test
KinesisFirehoseDynamicSink expectedSink =
- new
KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder()
+ new
KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder()
.setConsumedDataType(sinkSchema.toPhysicalRowDataType())
.setDeliveryStream(DELIVERY_STREAM_NAME)
.setFirehoseClientProperties(defaultSinkProperties())
@@ -134,9 +134,8 @@ public class KinesisFirehoseDynamicTableFactoryTest extends
TestLogger {
.withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true");
}
- private KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder
- getDefaultSinkBuilder() {
- return new
KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder()
+ private KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder
getDefaultSinkBuilder() {
+ return new
KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder()
.setFailOnError(true)
.setMaxBatchSize(100)
.setMaxInFlightRequests(100)
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88
b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/75596a92-3816-4a44-85ac-7c96e72f443a
similarity index 100%
copy from
flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88
copy to
flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/75596a92-3816-4a44-85ac-7c96e72f443a
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88
b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88
rename to
flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
similarity index 86%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
rename to
flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
index a370e4c..202fdf1 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
@@ -1,6 +1,6 @@
-org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkITCase does not
satisfy: only one of the following predicates match:\
+org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkITCase does not
satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that
are static, final, and of type InternalMiniClusterExtension and annotated with
@RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any
fields that are static, final, and of type MiniClusterExtension and annotated
with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResource and public and
final and not static and annotated with @Rule
\ No newline at end of file
+ or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResource and public and
final and not static and annotated with @Rule
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/stored.rules
b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/stored.rules
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/stored.rules
rename to
flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/stored.rules
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
b/flink-connectors/flink-connector-aws-kinesis-streams/pom.xml
similarity index 97%
rename from flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
rename to flink-connectors/flink-connector-aws-kinesis-streams/pom.xml
index 8e4b99b..7c864ef 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/pom.xml
@@ -30,8 +30,8 @@ under the License.
<relativePath>..</relativePath>
</parent>
- <artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
- <name>Flink : Connectors : AWS Kinesis Data Streams</name>
+ <artifactId>flink-connector-aws-kinesis-streams</artifactId>
+ <name>Flink : Connectors : Amazon Kinesis Data Streams</name>
<properties>
<aws.sdk.version>2.17.52</aws.sdk.version>
</properties>
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsConfigConstants.java
similarity index 91%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsConfigConstants.java
index a5a6020..338e52a 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsConfigConstants.java
@@ -19,9 +19,9 @@ package org.apache.flink.connector.kinesis.sink;
import org.apache.flink.annotation.PublicEvolving;
-/** Defaults for {@link KinesisDataStreamsSinkWriter}. */
+/** Defaults for {@link KinesisStreamsSinkWriter}. */
@PublicEvolving
-public class KinesisDataStreamsConfigConstants {
+public class KinesisStreamsConfigConstants {
public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT =
"Apache Flink %s (%s) Kinesis Connector";
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsException.java
similarity index 71%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsException.java
index 3ab30e1..696cbd8 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsException.java
@@ -21,30 +21,30 @@ package org.apache.flink.connector.kinesis.sink;
* A {@link RuntimeException} wrapper indicating the exception was thrown from
the Kinesis Data
* Streams Sink.
*/
-class KinesisDataStreamsException extends RuntimeException {
+class KinesisStreamsException extends RuntimeException {
- public KinesisDataStreamsException(final String message) {
+ public KinesisStreamsException(final String message) {
super(message);
}
- public KinesisDataStreamsException(final String message, final Throwable
cause) {
+ public KinesisStreamsException(final String message, final Throwable
cause) {
super(message, cause);
}
/**
- * When the flag {@code failOnError} is set in {@link
KinesisDataStreamsSinkWriter}, this
- * exception is raised as soon as any exception occurs when KDS is written
to.
+ * When the flag {@code failOnError} is set in {@link
KinesisStreamsSinkWriter}, this exception
+ * is raised as soon as any exception occurs when KDS is written to.
*/
- static class KinesisDataStreamsFailFastException extends
KinesisDataStreamsException {
+ static class KinesisStreamsFailFastException extends
KinesisStreamsException {
private static final String ERROR_MESSAGE =
"Encountered an exception while persisting records, not
retrying due to {failOnError} being set.";
- public KinesisDataStreamsFailFastException() {
+ public KinesisStreamsFailFastException() {
super(ERROR_MESSAGE);
}
- public KinesisDataStreamsFailFastException(final Throwable cause) {
+ public KinesisStreamsFailFastException(final Throwable cause) {
super(ERROR_MESSAGE, cause);
}
}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
similarity index 89%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
index d8288f7..b6cef5a 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
@@ -59,18 +59,18 @@ import java.util.Properties;
* Streams, the job will fail immediately if failOnError is set
* </ul>
*
- * <p>Please see the writer implementation in {@link
KinesisDataStreamsSinkWriter}
+ * <p>Please see the writer implementation in {@link KinesisStreamsSinkWriter}
*
* @param <InputT> Type of the elements handled by this sink
*/
@PublicEvolving
-public class KinesisDataStreamsSink<InputT> extends AsyncSinkBase<InputT,
PutRecordsRequestEntry> {
+public class KinesisStreamsSink<InputT> extends AsyncSinkBase<InputT,
PutRecordsRequestEntry> {
private final boolean failOnError;
private final String streamName;
private final Properties kinesisClientProperties;
- KinesisDataStreamsSink(
+ KinesisStreamsSink(
ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
Integer maxBatchSize,
Integer maxInFlightRequests,
@@ -101,21 +101,21 @@ public class KinesisDataStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRec
}
/**
- * Create a {@link KinesisDataStreamsSinkBuilder} to allow the fluent
construction of a new
- * {@code KinesisDataStreamsSink}.
+ * Create a {@link KinesisStreamsSinkBuilder} to allow the fluent
construction of a new {@code
+ * KinesisStreamsSink}.
*
* @param <InputT> type of incoming records
- * @return {@link KinesisDataStreamsSinkBuilder}
+ * @return {@link KinesisStreamsSinkBuilder}
*/
- public static <InputT> KinesisDataStreamsSinkBuilder<InputT> builder() {
- return new KinesisDataStreamsSinkBuilder<>();
+ public static <InputT> KinesisStreamsSinkBuilder<InputT> builder() {
+ return new KinesisStreamsSinkBuilder<>();
}
@Internal
@Override
public StatefulSinkWriter<InputT,
BufferedRequestState<PutRecordsRequestEntry>> createWriter(
InitContext context) throws IOException {
- return new KinesisDataStreamsSinkWriter<>(
+ return new KinesisStreamsSinkWriter<>(
getElementConverter(),
context,
getMaxBatchSize(),
@@ -134,7 +134,7 @@ public class KinesisDataStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRec
@Override
public
SimpleVersionedSerializer<BufferedRequestState<PutRecordsRequestEntry>>
getWriterStateSerializer() {
- return new KinesisDataStreamsStateSerializer();
+ return new KinesisStreamsStateSerializer();
}
@Internal
@@ -143,7 +143,7 @@ public class KinesisDataStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRec
InitContext context,
Collection<BufferedRequestState<PutRecordsRequestEntry>>
recoveredState)
throws IOException {
- return new KinesisDataStreamsSinkWriter<>(
+ return new KinesisStreamsSinkWriter<>(
getElementConverter(),
context,
getMaxBatchSize(),
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
similarity index 83%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
index 20b960f..3e6f7ec 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
@@ -27,14 +27,14 @@ import java.util.Optional;
import java.util.Properties;
/**
- * Builder to construct {@link KinesisDataStreamsSink}.
+ * Builder to construct {@link KinesisStreamsSink}.
*
- * <p>The following example shows the minimum setup to create a {@link
KinesisDataStreamsSink} that
+ * <p>The following example shows the minimum setup to create a {@link
KinesisStreamsSink} that
* writes String values to a Kinesis Data Streams stream named
your_stream_here.
*
* <pre>{@code
- * KinesisDataStreamsSink<String> kdsSink =
- * KinesisDataStreamsSink.<String>builder()
+ * KinesisStreamsSink<String> kdsSink =
+ * KinesisStreamsSink.<String>builder()
* .setElementConverter(elementConverter)
* .setStreamName("your_stream_name")
* .setSerializationSchema(new SimpleStringSchema())
@@ -57,9 +57,9 @@ import java.util.Properties;
* @param <InputT> type of elements that should be persisted in the destination
*/
@PublicEvolving
-public class KinesisDataStreamsSinkBuilder<InputT>
+public class KinesisStreamsSinkBuilder<InputT>
extends AsyncSinkBaseBuilder<
- InputT, PutRecordsRequestEntry,
KinesisDataStreamsSinkBuilder<InputT>> {
+ InputT, PutRecordsRequestEntry,
KinesisStreamsSinkBuilder<InputT>> {
private static final int DEFAULT_MAX_BATCH_SIZE = 500;
private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
@@ -75,7 +75,7 @@ public class KinesisDataStreamsSinkBuilder<InputT>
private SerializationSchema<InputT> serializationSchema;
private PartitionKeyGenerator<InputT> partitionKeyGenerator;
- KinesisDataStreamsSinkBuilder() {}
+ KinesisStreamsSinkBuilder() {}
/**
* Sets the name of the KDS stream that the sink will connect to. There is
no default for this
@@ -83,40 +83,40 @@ public class KinesisDataStreamsSinkBuilder<InputT>
* fail.
*
* @param streamName the name of the stream
- * @return {@link KinesisDataStreamsSinkBuilder} itself
+ * @return {@link KinesisStreamsSinkBuilder} itself
*/
- public KinesisDataStreamsSinkBuilder<InputT> setStreamName(String
streamName) {
+ public KinesisStreamsSinkBuilder<InputT> setStreamName(String streamName) {
this.streamName = streamName;
return this;
}
- public KinesisDataStreamsSinkBuilder<InputT> setSerializationSchema(
+ public KinesisStreamsSinkBuilder<InputT> setSerializationSchema(
SerializationSchema<InputT> serializationSchema) {
this.serializationSchema = serializationSchema;
return this;
}
- public KinesisDataStreamsSinkBuilder<InputT> setPartitionKeyGenerator(
+ public KinesisStreamsSinkBuilder<InputT> setPartitionKeyGenerator(
PartitionKeyGenerator<InputT> partitionKeyGenerator) {
this.partitionKeyGenerator = partitionKeyGenerator;
return this;
}
- public KinesisDataStreamsSinkBuilder<InputT> setFailOnError(boolean
failOnError) {
+ public KinesisStreamsSinkBuilder<InputT> setFailOnError(boolean
failOnError) {
this.failOnError = failOnError;
return this;
}
- public KinesisDataStreamsSinkBuilder<InputT> setKinesisClientProperties(
+ public KinesisStreamsSinkBuilder<InputT> setKinesisClientProperties(
Properties kinesisClientProperties) {
this.kinesisClientProperties = kinesisClientProperties;
return this;
}
@Override
- public KinesisDataStreamsSink<InputT> build() {
- return new KinesisDataStreamsSink<>(
- new KinesisDataStreamsSinkElementConverter.Builder<InputT>()
+ public KinesisStreamsSink<InputT> build() {
+ return new KinesisStreamsSink<>(
+ new KinesisStreamsSinkElementConverter.Builder<InputT>()
.setSerializationSchema(serializationSchema)
.setPartitionKeyGenerator(partitionKeyGenerator)
.build(),
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
similarity index 88%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
index 2873603..bca7442 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
@@ -32,7 +32,7 @@ import
software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
* PartitionKeyGenerator} lambda to transform the input element into a String.
*/
@Internal
-public class KinesisDataStreamsSinkElementConverter<InputT>
+public class KinesisStreamsSinkElementConverter<InputT>
implements ElementConverter<InputT, PutRecordsRequestEntry> {
/** A serialization schema to specify how the input element should be
serialized. */
@@ -43,7 +43,7 @@ public class KinesisDataStreamsSinkElementConverter<InputT>
*/
private final PartitionKeyGenerator<InputT> partitionKeyGenerator;
- private KinesisDataStreamsSinkElementConverter(
+ private KinesisStreamsSinkElementConverter(
SerializationSchema<InputT> serializationSchema,
PartitionKeyGenerator<InputT> partitionKeyGenerator) {
this.serializationSchema = serializationSchema;
@@ -62,7 +62,7 @@ public class KinesisDataStreamsSinkElementConverter<InputT>
return new Builder<>();
}
- /** A builder for the KinesisDataStreamsSinkElementConverter. */
+ /** A builder for the KinesisStreamsSinkElementConverter. */
public static class Builder<InputT> {
private SerializationSchema<InputT> serializationSchema;
@@ -80,16 +80,16 @@ public class KinesisDataStreamsSinkElementConverter<InputT>
return this;
}
- public KinesisDataStreamsSinkElementConverter<InputT> build() {
+ public KinesisStreamsSinkElementConverter<InputT> build() {
Preconditions.checkNotNull(
serializationSchema,
"No SerializationSchema was supplied to the "
- + "KinesisDataStreamsSinkElementConverter
builder.");
+ + "KinesisStreamsSinkElementConverter builder.");
Preconditions.checkNotNull(
partitionKeyGenerator,
"No PartitionKeyGenerator lambda was supplied to the "
- + "KinesisDataStreamsSinkElementConverter
builder.");
- return new KinesisDataStreamsSinkElementConverter<>(
+ + "KinesisStreamsSinkElementConverter builder.");
+ return new KinesisStreamsSinkElementConverter<>(
serializationSchema, partitionKeyGenerator);
}
}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
similarity index 90%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
index c045041..c5fc212 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
@@ -50,23 +50,22 @@ import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionCla
import static
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
/**
- * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis
Data Streams. More
- * details on the operation of this sink writer may be found in the doc for
{@link
- * KinesisDataStreamsSink}. More details on the internals of this sink writer
may be found in {@link
- * AsyncSinkWriter}.
+ * Sink writer created by {@link KinesisStreamsSink} to write to Kinesis Data
Streams. More details
+ * on the operation of this sink writer may be found in the doc for {@link
KinesisStreamsSink}. More
+ * details on the internals of this sink writer may be found in {@link
AsyncSinkWriter}.
*
* <p>The {@link KinesisAsyncClient} used here may be configured in the
standard way for the AWS SDK
* 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID}
and {@code
* AWS_SECRET_ACCESS_KEY} through environment variables etc.
*/
-class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT,
PutRecordsRequestEntry> {
- private static final Logger LOG =
LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class);
+class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT,
PutRecordsRequestEntry> {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisStreamsSinkWriter.class);
private static final FatalExceptionClassifier
RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER =
FatalExceptionClassifier.withRootCauseOfType(
ResourceNotFoundException.class,
err ->
- new KinesisDataStreamsException(
+ new KinesisStreamsException(
"Encountered non-recoverable exception
relating to not being able to find the specified resources",
err));
@@ -95,7 +94,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRe
/* Flag to whether fatally fail any time we encounter an exception when
persisting records */
private final boolean failOnError;
- KinesisDataStreamsSinkWriter(
+ KinesisStreamsSinkWriter(
ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
Sink.InitContext context,
int maxBatchSize,
@@ -122,7 +121,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRe
Collections.emptyList());
}
- KinesisDataStreamsSinkWriter(
+ KinesisStreamsSinkWriter(
ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
Sink.InitContext context,
int maxBatchSize,
@@ -161,8 +160,8 @@ class KinesisDataStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRe
kinesisClientProperties,
httpClient,
KinesisAsyncClient.builder(),
-
KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
-
KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
+
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
}
@Override
@@ -220,7 +219,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRe
if (failOnError) {
getFatalExceptionCons()
- .accept(new
KinesisDataStreamsException.KinesisDataStreamsFailFastException());
+ .accept(new
KinesisStreamsException.KinesisStreamsFailFastException());
return;
}
List<PutRecordsRequestEntry> failedRequestEntries =
@@ -243,9 +242,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRe
}
if (failOnError) {
getFatalExceptionCons()
- .accept(
- new
KinesisDataStreamsException.KinesisDataStreamsFailFastException(
- err));
+ .accept(new
KinesisStreamsException.KinesisStreamsFailFastException(err));
return false;
}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializer.java
similarity index 95%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializer.java
index f1986ef..ad1bd8f 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializer.java
@@ -31,7 +31,7 @@ import java.nio.charset.StandardCharsets;
/** Kinesis Streams implementation {@link AsyncSinkWriterStateSerializer}. */
@Internal
-public class KinesisDataStreamsStateSerializer
+public class KinesisStreamsStateSerializer
extends AsyncSinkWriterStateSerializer<PutRecordsRequestEntry> {
@Override
protected void serializeRequestToStream(PutRecordsRequestEntry request,
DataOutputStream out)
@@ -51,7 +51,7 @@ public class KinesisDataStreamsStateSerializer
if (request.explicitHashKey() != null) {
throw new IllegalStateException(
String.format(
- "KinesisDataStreamsStateSerializer is incompatible
with ElementConverter."
+ "KinesisStreamsStateSerializer is incompatible
with ElementConverter."
+ "Serializer version %d does not support
explicit hash key.",
getVersion()));
}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
similarity index 96%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
index 361db54..c5ae78b 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
-import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink;
-import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkBuilder;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkBuilder;
import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
@@ -105,8 +105,8 @@ public class KinesisDynamicSink extends
AsyncDynamicTableSink<PutRecordsRequestE
SerializationSchema<RowData> serializationSchema =
encodingFormat.createRuntimeEncoder(context, consumedDataType);
- KinesisDataStreamsSinkBuilder<RowData> builder =
- KinesisDataStreamsSink.<RowData>builder()
+ KinesisStreamsSinkBuilder<RowData> builder =
+ KinesisStreamsSink.<RowData>builder()
.setSerializationSchema(serializationSchema)
.setPartitionKeyGenerator(partitioner)
.setKinesisClientProperties(kinesisClientProperties)
@@ -114,7 +114,7 @@ public class KinesisDynamicSink extends
AsyncDynamicTableSink<PutRecordsRequestE
Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError);
addAsyncOptionsToSinkBuilder(builder);
- KinesisDataStreamsSink<RowData> kdsSink = builder.build();
+ KinesisStreamsSink<RowData> kdsSink = builder.build();
return SinkV2Provider.of(kdsSink);
}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
similarity index 92%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
index 27a8501..5aa0931 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
-import
org.apache.flink.connector.kinesis.table.util.KinesisDataStreamsConnectorOptionsUtils;
+import
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
@@ -39,7 +39,7 @@ import static
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.S
import static
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER;
import static
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER_FIELD_DELIMITER;
import static
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.STREAM;
-import static
org.apache.flink.connector.kinesis.table.util.KinesisDataStreamsConnectorOptionsUtils.KINESIS_CLIENT_PROPERTIES_KEY;
+import static
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.KINESIS_CLIENT_PROPERTIES_KEY;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
/** Factory for creating {@link KinesisDynamicSink}. */
@@ -52,8 +52,8 @@ public class KinesisDynamicTableSinkFactory extends
AsyncDynamicTableSinkFactory
AsyncDynamicSinkContext factoryContext = new
AsyncDynamicSinkContext(this, context);
- KinesisDataStreamsConnectorOptionsUtils optionsUtils =
- new KinesisDataStreamsConnectorOptionsUtils(
+ KinesisStreamsConnectorOptionsUtils optionsUtils =
+ new KinesisStreamsConnectorOptionsUtils(
factoryContext.getResolvedOptions(),
factoryContext.getTableOptions(),
(RowType)
factoryContext.getPhysicalDataType().getLogicalType(),
@@ -105,8 +105,8 @@ public class KinesisDynamicTableSinkFactory extends
AsyncDynamicTableSinkFactory
options.add(SINK_PARTITIONER);
options.add(SINK_PARTITIONER_FIELD_DELIMITER);
options.add(SINK_FAIL_ON_ERROR);
- return
KinesisDataStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper
- .addDeprecatedKeys(options);
+ return
KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper.addDeprecatedKeys(
+ options);
}
private static void validateKinesisPartitioner(
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisDataStreamsConnectorOptionsUtils.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
similarity index 99%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisDataStreamsConnectorOptionsUtils.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
index 4981236..4b30fe0 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisDataStreamsConnectorOptionsUtils.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
@@ -58,7 +58,7 @@ import static
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.S
* for handling each specified set of options.
*/
@Internal
-public class KinesisDataStreamsConnectorOptionsUtils {
+public class KinesisStreamsConnectorOptionsUtils {
/** Key for accessing kinesisAsyncClient properties. */
public static final String KINESIS_CLIENT_PROPERTIES_KEY =
"sink.client.properties";
@@ -79,7 +79,7 @@ public class KinesisDataStreamsConnectorOptionsUtils {
KinesisProducerOptionsMapper.KINESIS_PRODUCER_PREFIX
};
- public KinesisDataStreamsConnectorOptionsUtils(
+ public KinesisStreamsConnectorOptionsUtils(
Map<String, String> options,
ReadableConfig tableOptions,
RowType physicalType,
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/resources/log4j2.properties
b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/resources/log4j2.properties
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/resources/log4j2.properties
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/main/resources/log4j2.properties
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
similarity index 85%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
index c886fd7..a682762 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
@@ -23,8 +23,8 @@ import
org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.assertj.core.api.Assertions;
import org.junit.Test;
-/** Covers construction, defaults and sanity checking of
KinesisDataStreamsSinkBuilder. */
-public class KinesisDataStreamsSinkBuilderTest {
+/** Covers construction, defaults and sanity checking of
KinesisStreamsSinkBuilder. */
+public class KinesisStreamsSinkBuilderTest {
private static final SerializationSchema<String> SERIALIZATION_SCHEMA =
new SimpleStringSchema();
private static final PartitionKeyGenerator<String> PARTITION_KEY_GENERATOR
=
@@ -33,9 +33,9 @@ public class KinesisDataStreamsSinkBuilderTest {
@Test
public void elementConverterOfSinkMustBeSetWhenBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
- .isThrownBy(() ->
KinesisDataStreamsSink.builder().setStreamName("stream").build())
+ .isThrownBy(() ->
KinesisStreamsSink.builder().setStreamName("stream").build())
.withMessageContaining(
- "No SerializationSchema was supplied to the
KinesisDataStreamsSinkElementConverter builder.");
+ "No SerializationSchema was supplied to the
KinesisStreamsSinkElementConverter builder.");
}
@Test
@@ -43,7 +43,7 @@ public class KinesisDataStreamsSinkBuilderTest {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
- KinesisDataStreamsSink.<String>builder()
+ KinesisStreamsSink.<String>builder()
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
.setSerializationSchema(SERIALIZATION_SCHEMA)
.build())
@@ -56,7 +56,7 @@ public class KinesisDataStreamsSinkBuilderTest {
Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(
() ->
- KinesisDataStreamsSink.<String>builder()
+ KinesisStreamsSink.<String>builder()
.setStreamName("")
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
.setSerializationSchema(SERIALIZATION_SCHEMA)
@@ -70,12 +70,12 @@ public class KinesisDataStreamsSinkBuilderTest {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
- KinesisDataStreamsSink.<String>builder()
+ KinesisStreamsSink.<String>builder()
.setStreamName("stream")
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
.build())
.withMessageContaining(
- "No SerializationSchema was supplied to the
KinesisDataStreamsSinkElementConverter builder.");
+ "No SerializationSchema was supplied to the
KinesisStreamsSinkElementConverter builder.");
}
@Test
@@ -83,11 +83,11 @@ public class KinesisDataStreamsSinkBuilderTest {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
- KinesisDataStreamsSink.<String>builder()
+ KinesisStreamsSink.<String>builder()
.setStreamName("stream")
.setSerializationSchema(SERIALIZATION_SCHEMA)
.build())
.withMessageContaining(
- "No PartitionKeyGenerator lambda was supplied to the
KinesisDataStreamsSinkElementConverter builder.");
+ "No PartitionKeyGenerator lambda was supplied to the
KinesisStreamsSinkElementConverter builder.");
}
}
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
similarity index 98%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
index 3e23e87..d93d899 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
@@ -63,7 +63,7 @@ import static
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROT
import static
org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
/** IT cases for using Kinesis Data Streams Sink based on Kinesalite. */
-public class KinesisDataStreamsSinkITCase extends TestLogger {
+public class KinesisStreamsSinkITCase extends TestLogger {
private static final String DEFAULT_FIRST_SHARD_NAME =
"shardId-000000000000";
@@ -409,10 +409,10 @@ public class KinesisDataStreamsSinkITCase extends
TestLogger {
private String kinesaliteStreamName = null;
private String sinkConnectionStreamName;
private SerializationSchema<String> serializationSchema =
- KinesisDataStreamsSinkITCase.this.serializationSchema;
+ KinesisStreamsSinkITCase.this.serializationSchema;
private PartitionKeyGenerator<String> partitionKeyGenerator =
- KinesisDataStreamsSinkITCase.this.partitionKeyGenerator;
- private Properties properties =
KinesisDataStreamsSinkITCase.this.getDefaultProperties();
+ KinesisStreamsSinkITCase.this.partitionKeyGenerator;
+ private Properties properties =
KinesisStreamsSinkITCase.this.getDefaultProperties();
public void runScenario() throws Exception {
if (kinesaliteStreamName != null) {
@@ -430,8 +430,8 @@ public class KinesisDataStreamsSinkITCase extends
TestLogger {
(long) numberOfElementsToSend))
.returns(String.class);
- KinesisDataStreamsSink<String> kdsSink =
- KinesisDataStreamsSink.<String>builder()
+ KinesisStreamsSink<String> kdsSink =
+ KinesisStreamsSink.<String>builder()
.setSerializationSchema(serializationSchema)
.setPartitionKeyGenerator(partitionKeyGenerator)
.setMaxTimeInBufferMS(bufferMaxTimeMS)
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializerTest.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializerTest.java
similarity index 89%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializerTest.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializerTest.java
index 01a9415..ce1bde5 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializerTest.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializerTest.java
@@ -30,11 +30,11 @@ import java.io.IOException;
import static
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
import static
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;
-/** Test class for {@link KinesisDataStreamsStateSerializer}. */
-public class KinesisDataStreamsStateSerializerTest {
+/** Test class for {@link KinesisStreamsStateSerializer}. */
+public class KinesisStreamsStateSerializerTest {
private static final ElementConverter<String, PutRecordsRequestEntry>
ELEMENT_CONVERTER =
- KinesisDataStreamsSinkElementConverter.<String>builder()
+ KinesisStreamsSinkElementConverter.<String>builder()
.setSerializationSchema(new SimpleStringSchema())
.setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
.build();
@@ -44,7 +44,7 @@ public class KinesisDataStreamsStateSerializerTest {
BufferedRequestState<PutRecordsRequestEntry> expectedState =
getTestState(ELEMENT_CONVERTER, this::getRequestSize);
- KinesisDataStreamsStateSerializer serializer = new
KinesisDataStreamsStateSerializer();
+ KinesisStreamsStateSerializer serializer = new
KinesisStreamsStateSerializer();
BufferedRequestState<PutRecordsRequestEntry> actualState =
serializer.deserialize(1, serializer.serialize(expectedState));
assertThatBufferStatesAreEqual(actualState, expectedState);
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
similarity index 92%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
index e17afa2..362e25f 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
@@ -19,7 +19,7 @@ package org.apache.flink.connector.kinesis.sink.examples;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,8 +31,7 @@ import software.amazon.awssdk.utils.ImmutableMap;
import java.util.Properties;
/**
- * An example application demonstrating how to use the {@link
KinesisDataStreamsSink} to sink into
- * KDS.
+ * An example application demonstrating how to use the {@link
KinesisStreamsSink} to sink into KDS.
*
* <p>The {@link KinesisAsyncClient} used here may be configured in the
standard way for the AWS SDK
* 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code
AWS_SECRET_ACCESS_KEY} through
@@ -54,8 +53,8 @@ public class SinkIntoKinesis {
Properties sinkProperties = new Properties();
sinkProperties.put(AWSConfigConstants.AWS_REGION, "your-region-here");
- KinesisDataStreamsSink<String> kdsSink =
- KinesisDataStreamsSink.<String>builder()
+ KinesisStreamsSink<String> kdsSink =
+ KinesisStreamsSink.<String>builder()
.setSerializationSchema(new SimpleStringSchema())
.setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
.setStreamName("your-stream-name")
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
similarity index 98%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
index dfe829e..e50acdb 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.connector.kinesis.table;
import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
@@ -85,7 +85,7 @@ public class KinesisDynamicTableSinkFactoryTest extends
TestLogger {
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new
SinkRuntimeProviderContext(false));
Sink<RowData> sinkFunction = ((SinkV2Provider)
sinkFunctionProvider).createSink();
-
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
+
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
}
@Test
@@ -142,7 +142,7 @@ public class KinesisDynamicTableSinkFactoryTest extends
TestLogger {
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new
SinkRuntimeProviderContext(false));
Sink<RowData> sinkFunction = ((SinkV2Provider)
sinkFunctionProvider).createSink();
-
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
+
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
}
@Test
@@ -171,7 +171,7 @@ public class KinesisDynamicTableSinkFactoryTest extends
TestLogger {
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new
SinkRuntimeProviderContext(false));
Sink<RowData> sinkFunction = ((SinkV2Provider)
sinkFunctionProvider).createSink();
-
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
+
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
}
@Test
@@ -205,7 +205,7 @@ public class KinesisDynamicTableSinkFactoryTest extends
TestLogger {
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new
SinkRuntimeProviderContext(false));
Sink<RowData> sinkFunction = ((SinkV2Provider)
sinkFunctionProvider).createSink();
-
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
+
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
}
@Test
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.java
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.java
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java
similarity index 94%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java
index 4ed5a5b..f4a295b 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java
+++
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.connector.kinesis.table.util;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import
org.apache.flink.connector.kinesis.table.util.KinesisDataStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper;
+import
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
@@ -39,7 +39,7 @@ public class KinesisProducerOptionsMapperTest extends
TestLogger {
expectedOptions.put(AWSConfigConstants.TRUST_ALL_CERTIFICATES, "true");
KinesisProducerOptionsMapper producerOptionsMapper =
- new
KinesisDataStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper(
+ new
KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper(
deprecatedOptions);
Map<String, String> actualMappedProperties =
producerOptionsMapper.mapDeprecatedClientOptions();
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/archunit.properties
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/archunit.properties
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/archunit.properties
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/archunit.properties
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/log4j2-test.properties
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/log4j2-test.properties
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/profile
b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/profile
similarity index 100%
rename from
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/profile
rename to
flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/profile
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml
b/flink-connectors/flink-connector-kinesis/pom.xml
index 0bb26bc..e9cb2a5 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -142,7 +142,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
</dependency>
@@ -206,7 +206,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
@@ -324,7 +324,7 @@ under the License.
<artifactSet
combine.children="append">
<includes>
<include>org.apache.flink:flink-connector-aws-base:*</include>
-
<include>org.apache.flink:flink-connector-aws-kinesis-data-streams:*</include>
+
<include>org.apache.flink:flink-connector-aws-kinesis-streams:*</include>
<include>com.amazonaws:*</include>
<include>com.google.protobuf:*</include>
<include>org.apache.httpcomponents:*</include>
@@ -396,7 +396,7 @@ under the License.
</excludes>
</filter>
<filter>
-
<artifact>org.apache.flink:flink-connector-aws-kinesis-data-streams:*</artifact>
+
<artifact>org.apache.flink:flink-connector-aws-kinesis-streams:*</artifact>
<excludes>
<exclude>profile</exclude>
</excludes>
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index f3e0bd8..338cd28 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import
org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -62,10 +63,9 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*
* @param <OUT> Data type to produce into Kinesis Streams
* @deprecated This producer based on the Kinesis Producer Library KPL has
been superseded. The new
- * sink can be found in the module {@code
- * flink-connectors/flink-connector-aws-kinesis-data-streams} and package
{@link
- * org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink}. It is
based on the AWS SDK
- * for Java 2.x. The work to replace this sink was carried out in
FLINK-24227.
+ * sink can be found in the module {@code
flink-connectors/flink-connector-aws-kinesis-streams}
+ * and package {@link KinesisStreamsSink}. It is based on the AWS SDK for
Java 2.x. The work to
+ * replace this sink was carried out in FLINK-24227.
*/
@Deprecated
@PublicEvolving
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
index 62f25db..f8cf637 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.proxy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
import org.apache.flink.util.Preconditions;
@@ -63,17 +63,17 @@ public class KinesisProxyV2Factory {
Properties legacyConfigProps = new Properties(configProps);
legacyConfigProps.setProperty(
-
KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
+ KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
-
KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
final KinesisAsyncClient client =
AWSAsyncSinkUtil.createAwsAsyncClient(
legacyConfigProps,
httpClient,
KinesisAsyncClient.builder(),
-
KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
-
KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
+
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
return new KinesisProxyV2(client, httpClient, configuration, BACKOFF);
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 684234c..c79e8df 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.annotation.Internal;
import
org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider;
import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
-import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -84,7 +84,7 @@ public class AWSUtil {
// set a Flink-specific user agent
awsClientConfig.setUserAgentPrefix(
AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
-
KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
// utilize automatic refreshment of credentials by directly passing the
// AWSCredentialsProvider
diff --git a/flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml
b/flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml
index 69b932a..8b4a2df 100644
--- a/flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml
+++ b/flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml
@@ -29,7 +29,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
- <name>Flink : Connectors : SQL : AWS Kinesis Data Firehose</name>
+ <name>Flink : Connectors : SQL : Amazon Kinesis Data Firehose</name>
<dependencies>
<dependency>
diff --git
a/flink-connectors/flink-sql-connector-aws-kinesis-data-streams/pom.xml
b/flink-connectors/flink-sql-connector-aws-kinesis-streams/pom.xml
similarity index 90%
rename from
flink-connectors/flink-sql-connector-aws-kinesis-data-streams/pom.xml
rename to flink-connectors/flink-sql-connector-aws-kinesis-streams/pom.xml
index c3bbc8e..0804a3a 100644
--- a/flink-connectors/flink-sql-connector-aws-kinesis-data-streams/pom.xml
+++ b/flink-connectors/flink-sql-connector-aws-kinesis-streams/pom.xml
@@ -28,13 +28,13 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>flink-sql-connector-aws-kinesis-data-streams</artifactId>
- <name>Flink : Connectors : SQL : AWS Kinesis Data Streams</name>
+ <artifactId>flink-sql-connector-aws-kinesis-streams</artifactId>
+ <name>Flink : Connectors : SQL : Amazon Kinesis Data Streams</name>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
@@ -57,7 +57,7 @@
<includes>
<include>org.apache.flink:flink-connector-base</include>
<include>org.apache.flink:flink-connector-aws-base</include>
-
<include>org.apache.flink:flink-connector-aws-kinesis-data-streams</include>
+
<include>org.apache.flink:flink-connector-aws-kinesis-streams</include>
<include>software.amazon.awssdk:*</include>
<include>org.reactivestreams:*</include>
<include>com.typesafe.netty:*</include>
@@ -90,7 +90,7 @@
</relocations>
<filters>
<filter>
-
<artifact>org.apache.flink:flink-connector-aws-kinesis-data-streams:*</artifact>
+
<artifact>org.apache.flink:flink-connector-aws-kinesis-streams:*</artifact>
<excludes>
<exclude>profile</exclude>
</excludes>
diff --git
a/flink-connectors/flink-sql-connector-aws-kinesis-data-streams/src/main/resources/META-INF/NOTICE
b/flink-connectors/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE
similarity index 97%
rename from
flink-connectors/flink-sql-connector-aws-kinesis-data-streams/src/main/resources/META-INF/NOTICE
rename to
flink-connectors/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE
index 51d9fa0..8b334b0 100644
---
a/flink-connectors/flink-sql-connector-aws-kinesis-data-streams/src/main/resources/META-INF/NOTICE
+++
b/flink-connectors/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE
@@ -1,4 +1,4 @@
-flink-sql-connector-aws-kinesis-data-streams
+flink-sql-connector-aws-kinesis-streams
Copyright 2014-2021 The Apache Software Foundation
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 0eb1843..a1229f3 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -53,7 +53,7 @@ under the License.
<module>flink-connector-gcp-pubsub</module>
<module>flink-connector-aws-base</module>
<module>flink-connector-kinesis</module>
- <module>flink-connector-aws-kinesis-data-streams</module>
+ <module>flink-connector-aws-kinesis-streams</module>
<module>flink-connector-aws-kinesis-firehose</module>
<module>flink-connector-base</module>
<module>flink-file-sink-common</module>
@@ -104,7 +104,7 @@ under the License.
<module>flink-sql-connector-hive-2.3.6</module>
<module>flink-sql-connector-hive-3.1.2</module>
<module>flink-sql-connector-kafka</module>
-
<module>flink-sql-connector-aws-kinesis-data-streams</module>
+
<module>flink-sql-connector-aws-kinesis-streams</module>
<module>flink-sql-connector-aws-kinesis-firehose</module>
<module>flink-sql-connector-kinesis</module>
<module>flink-sql-connector-pulsar</module>
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/pom.xml
b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/pom.xml
similarity index 94%
rename from
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/pom.xml
rename to
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/pom.xml
index f696878..423afd5 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/pom.xml
@@ -28,7 +28,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>flink-end-to-end-tests-aws-kinesis-data-streams</artifactId>
+ <artifactId>flink-end-to-end-tests-aws-kinesis-streams</artifactId>
<name>Flink : E2E Tests : Kinesis SQL tests</name>
<packaging>jar</packaging>
@@ -49,7 +49,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
@@ -85,7 +85,7 @@
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-sql-connector-aws-kinesis-data-streams</artifactId>
+
<artifactId>flink-sql-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
<destFileName>sql-kinesis-streams.jar</destFileName>
<type>jar</type>
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisDataStreamsTableApiIT.java
b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
similarity index 96%
rename from
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisDataStreamsTableApiIT.java
rename to
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
index 2c9c2f7..fcb3b5b 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisDataStreamsTableApiIT.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
@@ -68,11 +68,10 @@ import java.util.function.Function;
import static java.util.concurrent.TimeUnit.SECONDS;
-/** End-to-end test for Kinesis DataStream Table API Sink using Kinesalite. */
-public class KinesisDataStreamsTableApiIT {
+/** End-to-end test for Kinesis Streams Table API Sink using Kinesalite. */
+public class KinesisStreamsTableApiIT {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(KinesisDataStreamsTableApiIT.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KinesisStreamsTableApiIT.class);
private static final String ORDERS_STREAM = "orders";
private static final String INTER_CONTAINER_KINESALITE_ALIAS =
"kinesalite";
@@ -214,7 +213,7 @@ public class KinesisDataStreamsTableApiIT {
GetShardIteratorRequest.builder()
.shardId(DEFAULT_FIRST_SHARD_NAME)
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
-
.streamName(KinesisDataStreamsTableApiIT.ORDERS_STREAM)
+
.streamName(KinesisStreamsTableApiIT.ORDERS_STREAM)
.build())
.get()
.shardIterator();
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/resources/log4j2-test.properties
similarity index 100%
rename from
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
rename to
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/resources/log4j2-test.properties
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/resources/send-orders.sql
b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/resources/send-orders.sql
similarity index 100%
rename from
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/resources/send-orders.sql
rename to
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/resources/send-orders.sql
diff --git
a/flink-end-to-end-tests/flink-glue-schema-registry-json-test/pom.xml
b/flink-end-to-end-tests/flink-glue-schema-registry-json-test/pom.xml
index 81f01e0..1912108 100644
--- a/flink-end-to-end-tests/flink-glue-schema-registry-json-test/pom.xml
+++ b/flink-end-to-end-tests/flink-glue-schema-registry-json-test/pom.xml
@@ -114,7 +114,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
index d5db556..fafa407 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -83,7 +83,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index c39f798..6a28b30 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -80,7 +80,7 @@ under the License.
<module>flink-glue-schema-registry-avro-test</module>
<module>flink-glue-schema-registry-json-test</module>
<module>flink-end-to-end-tests-scala</module>
- <module>flink-end-to-end-tests-aws-kinesis-data-streams</module>
+ <module>flink-end-to-end-tests-aws-kinesis-streams</module>
<module>flink-end-to-end-tests-aws-kinesis-firehose</module>
</modules>
diff --git a/pom.xml b/pom.xml
index d471a8e..ab7dead 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1481,7 +1481,7 @@ under the License.
<exclude>flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/query/*</exclude>
<exclude>flink-connectors/flink-connector-aws-base/src/test/resources/profile</exclude>
<exclude>flink-connectors/flink-connector-kinesis/src/test/resources/profile</exclude>
-
<exclude>flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/profile</exclude>
+
<exclude>flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/profile</exclude>
<exclude>flink-table/flink-table-code-splitter/src/test/resources/**</exclude>
<exclude>flink-connectors/flink-connector-pulsar/src/test/resources/**</exclude>
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index 9ad4ad1..78562d8 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -107,7 +107,7 @@ flink-connectors/flink-connector-elasticsearch-base,\
flink-connectors/flink-connector-nifi,\
flink-connectors/flink-connector-rabbitmq,\
flink-connectors/flink-connector-kinesis,\
-flink-connectors/flink-connector-aws-kinesis-data-streams,\
+flink-connectors/flink-connector-aws-kinesis-streams,\
flink-connectors/flink-connector-aws-kinesis-firehose,\
flink-metrics/flink-metrics-dropwizard,\
flink-metrics/flink-metrics-graphite,\