hlteoh37 commented on code in PR #179:
URL: 
https://github.com/apache/flink-connector-aws/pull/179#discussion_r1832518101


##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -27,644 +27,348 @@ under the License.
 
 # Amazon Kinesis Data Streams Connector
 
-The Kinesis connector provides access to [Amazon Kinesis Data 
Streams](http://aws.amazon.com/kinesis/streams/).
-
-To use this connector, add one or more of the following dependencies to your 
project, depending on whether you are reading from and/or writing to Kinesis 
Data Streams:
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">KDS Connectivity</th>
-      <th class="text-left">Maven Dependency</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>Source</td>
-        <td>{{< connector_artifact flink-connector-kinesis kinesis >}}</td>
-    </tr>
-    <tr>
-        <td>Sink</td>
-        <td>{{< connector_artifact flink-connector-aws-kinesis-streams kinesis 
>}}</td>
-    </tr>
-  </tbody>
-</table>
+The Kinesis connector allows users to read/write from [Amazon Kinesis Data 
Streams](http://aws.amazon.com/kinesis/streams/).
 
-{{< py_connector_download_link "kinesis" >}}
+## Dependency
 
-## Using the Amazon Kinesis Streams Service
-Follow the instructions from the [Amazon Kinesis Streams Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
-to setup Kinesis streams.
+To use this connector, add the below dependency to your project:
 
-## Configuring Access to Kinesis with IAM
-Make sure to create the appropriate IAM policy to allow reading / writing to / 
from the Kinesis streams. See examples 
[here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
+{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}}
 
-Depending on your deployment you would choose a different Credentials Provider 
to allow access to Kinesis.
-By default, the `AUTO` Credentials Provider is used.
-If the access key ID and secret key are set in the configuration, the `BASIC` 
provider is used.  
+For use in PyFlink jobs, use the following dependency:
 
-A specific Credentials Provider can **optionally** be set by using the 
`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
- 
-Supported Credential Providers are:
-* `AUTO` - Using the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
-* `BASIC` - Using access key ID and secret key supplied as configuration. 
-* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
-* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
-* `CUSTOM` - Use a custom user class as credential provider.
-* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
-* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
-* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token. 
+{{< py_connector_download_link "kinesis" >}}
 
-## Kinesis Consumer
 
-The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source 
that subscribes to multiple AWS Kinesis
-streams within the same AWS service region, and can transparently handle 
resharding of streams while the job is running. Each subtask of the consumer is
-responsible for fetching data records from multiple Kinesis shards. The number 
of shards fetched by each subtask will
-change as shards are closed and created by Kinesis.
+## Kinesis Streams Source
+The `KinesisStreamsSource` is an exactly-once, parallel streaming data source 
based on the [FLIP-27 source 
interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface).
+The source subscribes to a single Amazon Kinesis Data stream, and reads events 
whilst maintaining order within a specific Kinesis `partitionId`.
+The `KinesisStreamsSource` will discover the shards of the stream and start 
reading from each shard in parallel, depending on the parallelism of the 
operator.
+For more details on selecting the right parallelism, see section on 
[parallelism](#parallelism-and-number-of-shards).
+It also transparently handles discovery of new shards of the Kinesis Data 
stream if resharding of streams occurs while the job is running.
+
+{{< hint info >}}
+Note: Before consuming data, ensure that the Kinesis Data Stream is created 
with `ACTIVE` status on the Amazon Kinesis Data Streams console.
+{{< /hint >}}
 
-Before consuming data from Kinesis streams, make sure that all streams are 
created with the status "ACTIVE" in the Amazon Kinesis Data Stream console.
+The `KinesisStreamsSource` provides a fluent builder to construct an instance 
of the `KinesisStreamsSource`. 
+The code snippet below illustrates how to do so. 
 
-{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3370" >}}
+{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3371" >}}
 {{< tab "Java" >}}
 ```java
-Properties consumerConfig = new Properties();
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+// Configure the KinesisStreamsSource
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, 
KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, 
by default connector will read from LATEST
+
+// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
+KinesisStreamsSource<String> kdsSource =
+        KinesisStreamsSource.<String>builder()
+                
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
+                .setSourceConfig(sourceConfig)
+                .setDeserializationSchema(new SimpleStringSchema())
+                
.setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This 
is optional, by default uniformShardAssigner will be used.
+                .build();
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+// Specify watermarking strategy and the name of the Kinesis Source operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+DataStream<String> kinesisRecordsWithEventTimeWatermarks = 
env.fromSource(kdsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "Kinesis source")
+        .returns(TypeInformation.of(String.class))
+        .uid("custom-uid");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val consumerConfig = new Properties()
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
-consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
-
-val env = StreamExecutionEnvironment.getExecutionEnvironment
+val sourceConfig = new Configuration()
+sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, 
KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON) // This is optional, 
by default connector will read from LATEST
 
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-```python
-consumer_config = {
-    'aws.region': 'us-east-1',
-    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
-    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
-    'flink.stream.initpos': 'LATEST'
-}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
-env = StreamExecutionEnvironment.get_execution_environment()
+val kdsSource = KinesisStreamsSource.builder[String]()
+            
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
+            .setSourceConfig(sourceConfig)
+            .setDeserializationSchema(new SimpleStringSchema())
+            
.setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This 
is optional, by default uniformShardAssigner will be used.
+            .build()
 
-kinesis = env.add_source(FlinkKinesisConsumer("stream-1", 
SimpleStringSchema(), consumer_config))
+val kinesisEvents = env.fromSource(kdsSource, 
WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "Kinesis source")
+            .uid("custom-uid")
 ```
 {{< /tab >}}
-{{< /tabs >}}
 
-The above is a simple example of using the consumer. Configuration for the 
consumer is supplied with a `java.util.Properties`
-instance, the configuration keys for which can be found in 
`AWSConfigConstants` (AWS-specific parameters) and 
-`ConsumerConfigConstants` (Kinesis consumer parameters). The example
-demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". 
The AWS credentials are supplied using the basic method in which
-the AWS access key ID and secret access key are directly supplied in the 
configuration. Also, data is being consumed
-from the newest position in the Kinesis stream (the other option will be 
setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
-to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream 
from the earliest record possible).
-
-Other optional configuration keys for the consumer can be found in 
`ConsumerConfigConstants`.
-
-Note that the configured parallelism of the Flink Kinesis Consumer source
-can be completely independent of the total number of shards in the Kinesis 
streams.
-When the number of shards is larger than the parallelism of the consumer,
-then each consumer subtask can subscribe to multiple shards; otherwise
-if the number of shards is smaller than the parallelism of the consumer,
-then some consumer subtasks will simply be idle and wait until it gets assigned
-new shards (i.e., when the streams are resharded to increase the
-number of shards for higher provisioned Kinesis service throughput).
-
-Also note that the default assignment of shards to subtasks is based on the 
hashes of the shard and stream names,
-which will more-or-less balance the shards across the subtasks.
-However, assuming the default Kinesis shard management is used on the stream 
(UpdateShardCount with `UNIFORM_SCALING`),
-setting `UniformShardAssigner` as the shard assigner on the consumer will much 
more evenly distribute shards to subtasks.
-Assuming the incoming Kinesis records are assigned random Kinesis 
`PartitionKey` or `ExplicitHashKey` values,
-the result is consistent subtask loading.
-If neither the default assigner nor the `UniformShardAssigner` suffice, a 
custom implementation of `KinesisShardAssigner` can be set.
-
-### The `DeserializationSchema`
-
-Flink Kinesis Consumer also needs a schema to know how to turn the binary data 
in a Kinesis Data Stream into Java objects.
-The `KinesisDeserializationSchema` allows users to specify such a schema. The 
`T deserialize(byte[] recordValue, String partitionKey, String seqNum, long 
approxArrivalTimestamp, String stream, String shardId)` 
-method gets called for each Kinesis record.
+The above is a simple example of using the `KinesisStreamsSource`.
+- The Kinesis stream being read from is specified using the Kinesis Stream ARN.
+- Configuration for the `Source` is supplied using an instance of Flink's 
`Configuration` class.
+  The configuration keys can be taken from `AWSConfigOptions` (AWS-specific 
configuration) and `KinesisSourceConfigOptions` (Kinesis Source configuration).
+- The example specifies the starting position as `TRIM_HORIZON` (see 
[Configuring Starting Position](#configuring-starting-position) for more 
information).
+- The deserialization format is as `SimpleStringSchema` (see [Deserialization 
Schema](#deserialization-schema) for more information).
+- The distribution of shards across subtasks is controlled using the 
`UniformShardAssigner`  (see [Shard Assignment 
Strategy](#shard-assignment-strategy) for more information).
+- The example also specifies an increasing `WatermarkStrategy`, which means 
each record will be tagged with event time specified using 
`approximateArrivalTimestamp`. 
+  Monotonically increasing watermarks will be generated, and subtasks will be 
considered idle if no record is emitted after 1 second.
 
-For convenience, Flink provides the following schemas out of the box:
-  
-1. `TypeInformationSerializationSchema` which creates a schema based on a 
Flink's `TypeInformation`. 
-    This is useful if the data is both written and read by Flink.
-    This schema is a performant Flink-specific alternative to other generic 
serialization approaches.
-    
-2. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup 
the writer's schema (schema which was used to write the record)
-   in [AWS Glue Schema 
Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). 
Using this, deserialization schema record will be
-   read with the schema retrieved from AWS Glue Schema Registry and 
transformed to either 
`com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema`
-   that represents generic record with a manually provided schema or a JAVA 
POJO generated by 
[mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema).
  
-   
-   <br>To use this deserialization schema one has to add the following 
additional dependency:
-       
-{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}}
-{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}}
-{{< connector_artifact flink-json-glue-schema-registry kinesis >}}
-{{< /tab >}}
-{{< /tabs >}}
-    
-3. `AvroDeserializationSchema` which reads data serialized with Avro format 
using a statically provided schema. It can
-    infer the schema from Avro generated classes 
(`AvroDeserializationSchema.forSpecific(...)`) or it can work with 
`GenericRecords`
-    with a manually provided schema (with 
`AvroDeserializationSchema.forGeneric(...)`). This deserialization schema 
expects that
-    the serialized records DO NOT contain the embedded schema.
+### Configuring Access to Kinesis with IAM
+Access to Kinesis streams are controlled via IAM identities. Make sure to 
create the appropriate IAM policy to allow reading / writing to / from the 
Kinesis streams. See examples 
[here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
 
-    - You can use [AWS Glue Schema 
Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
-      to retrieve the writer’s schema. Similarly, the deserialization record 
will be read with the schema from AWS Glue Schema Registry and transformed
-      (either through 
`GlueSchemaRegistryAvroDeserializationSchema.forGeneric(...)` or 
`GlueSchemaRegistryAvroDeserializationSchema.forSpecific(...)`).
-      For more information on integrating the AWS Glue Schema Registry with 
Apache Flink see
-      [Use Case: Amazon Kinesis Data Analytics for Apache 
Flink](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kinesis-data-analytics-apache-flink).
+Depending on your deployment, you can select a suitable AWS Credentials 
Provider.
+By default, the `AUTO` Credentials Provider is used. 
+If the access key ID and secret key are set in the configuration, the `BASIC` 
provider is used.
 
-    <br>To use this deserialization schema one has to add the following 
additional dependency:
-    
-{{< tabs "71c8eb0c-6a78-476f-a52e-8a46d83f2ca4" >}}
-{{< tab "AvroDeserializationSchema" >}}
-{{< artifact flink-avro >}}
-{{< /tab >}}
-{{< tab "GlueSchemaRegistryAvroDeserializationSchema" >}}
-{{< connector_artifact flink-avro-glue-schema-registry kinesis >}}
-{{< /tab >}}
-{{< /tabs >}}
+A specific Credentials Provider can **optionally** be set by using the 
`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
 
-### Configuring Starting Position
+Supported Credential Providers are:
+* `AUTO` - Using the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
+* `BASIC` - Using access key ID and secret key supplied as configuration.
+* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
+* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
+* `CUSTOM` - Use a custom user class as credential provider.
+* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token.
 
-The Flink Kinesis Consumer currently provides the following options to 
configure where to start reading Kinesis streams, simply by setting 
`ConsumerConfigConstants.STREAM_INITIAL_POSITION` to
-one of the following values in the provided configuration properties (the 
naming of the options identically follows [the namings used by the AWS Kinesis 
Streams 
service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
+### Configuring Starting Position
+To specify where the `KinesisStreamsSource` starts reading from the Kinesis 
stream, users can set the `KinesisSourceConfigOptions.STREAM_INITIAL_POSITION` 
in configuration.

Review Comment:
   Added a hint bubble



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to