This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit edcd0b0aec578ad73ca773fac5102cb026611306
Author: Zichen Liu <[email protected]>
AuthorDate: Mon Jan 24 18:29:37 2022 +0000

    [FLINK-24228][connectors/firehose] Added documentation (&.zh docs) for the 
new Firehose sink.
---
 .../docs/connectors/datastream/firehose.md         | 171 +++++++++++++++++++++
 .../content/docs/connectors/datastream/firehose.md | 171 +++++++++++++++++++++
 2 files changed, 342 insertions(+)

diff --git a/docs/content.zh/docs/connectors/datastream/firehose.md 
b/docs/content.zh/docs/connectors/datastream/firehose.md
new file mode 100644
index 0000000..ecc77ce
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/firehose.md
@@ -0,0 +1,171 @@
+---
+title: Firehose
+weight: 5
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Firehose Sink
+
+The Firehose sink writes to [Amazon Kinesis Data 
Firehose](https://aws.amazon.com/kinesis/data-firehose/).
+
+Follow the instructions from the [Amazon Kinesis Data Firehose Developer 
Guide](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)
+to setup a Kinesis Data Firehose delivery stream.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< artifact flink-connector-aws-kinesis-firehose >}}
+
+The `KinesisFirehoseSink` uses [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
 to write data from a Flink stream into a Firehose delivery stream.
+
+{{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
+{{< tab "Java" >}}
+```java
+KinesisFirehoseSinkElementConverter<String> elementConverter =
+    KinesisFirehoseSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .build();
+
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+
+KinesisFirehoseSink<String> kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setDeliveryStreamName("your-stream-name")     // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(50)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .build();
+
+flinkStream.sinkTo(kdfSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val elementConverter =
+    KinesisFirehoseSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .build()
+
+Properties sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+
+val kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setDeliveryStreamName("your-stream-name")     // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(50)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .build()
+
+flinkStream.sinkTo(kdfSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's Firehose sink is created by using the static builder 
`KinesisFirehoseSink.<String>builder()`.
+
+1. __setFirehoseClientProperties(Properties sinkProperties)__
+    * Required.
+    * Supplies credentials, region and other parameters to the Firehose client.
+2. __setElementConverter(KinesisFirehoseSinkElementConverter 
elementConverter)__
+    * Required.
+    * Supplies a serialization schema to the output. May be built using the 
following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as 
per the example.
+3. __setDeliveryStreamName(String deliveryStreamName)__
+    * Required.
+    * Name of the delivery stream to sink to.
+4. _setFailOnError(boolean failOnError)_
+    * Optional. Default: `false`.
+    * Whether failed requests to write records to Firehose are treated as 
fatal exceptions in the sink.
+5. _setMaxBatchSize(int maxBatchSize)_
+    * Optional. Default: `500`.
+    * Maximum size of a batch to write to Firehose.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+    * Optional. Default: `50`.
+    * The maximum number of in flight requests allowed before the sink applies 
backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+    * Optional. Default: `10_000`.
+    * The maximum number of records that may be buffered in the sink before 
backpressure is applied.
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+    * Optional. Default: `4 * 1024 * 1024`.
+    * The maximum size (in bytes) a batch may become. All batches sent will be 
smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+    * Optional. Default: `5000`.
+    * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+    * Optional. Default: `1000 * 1024`.
+    * The maximum record size that the sink will accept, records larger than 
this will be automatically rejected.
+11. _build()_
+    * Constructs and returns the Firehose sink.
+
+
+## Using Custom Firehose Endpoints
+
+It is sometimes desirable to have Flink operate as a consumer or producer 
against a Firehose VPC endpoint or a non-AWS
+Firehose endpoint such as [Localstack](https://localstack.cloud/); this is 
especially useful when performing
+functional testing of a Flink application. The AWS endpoint that would 
normally be inferred by the AWS region set in the
+Flink configuration must be overridden via a configuration property.
+
+To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and 
`AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the 
endpoint URL.
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties producerConfig = new Properties();
+        producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+        producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
+        producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+        producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, 
"http://localhost:4566";);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val producerConfig = new Properties()
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566";)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/firehose.md 
b/docs/content/docs/connectors/datastream/firehose.md
new file mode 100644
index 0000000..b224665
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/firehose.md
@@ -0,0 +1,171 @@
+---
+title: Firehose
+weight: 5
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Firehose Sink
+
+The Firehose sink writes to [Amazon Kinesis Data 
Firehose](https://aws.amazon.com/kinesis/data-firehose/).
+
+Follow the instructions from the [Amazon Kinesis Data Firehose Developer 
Guide](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)
+to setup a Kinesis Data Firehose delivery stream.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< artifact flink-connector-aws-kinesis-firehose >}}
+
+The `KinesisFirehoseSink` uses [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
 to write data from a Flink stream into a Firehose delivery stream.
+
+{{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
+{{< tab "Java" >}}
+```java
+KinesisFirehoseSinkElementConverter<String> elementConverter =
+    KinesisFirehoseSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .build();
+
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+
+KinesisFirehoseSink<String> kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setDeliveryStreamName("your-stream-name")     // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(50)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .build();
+
+flinkStream.sinkTo(kdfSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val elementConverter =
+    KinesisFirehoseSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .build()
+
+Properties sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+
+val kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setDeliveryStreamName("your-stream-name")     // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(50)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .build()
+
+flinkStream.sinkTo(kdfSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's Firehose sink is created by using the static builder 
`KinesisFirehoseSink.<String>builder()`.
+
+1. __setFirehoseClientProperties(Properties sinkProperties)__
+    * Required.
+    * Supplies credentials, region and other parameters to the Firehose client.
+2. __setElementConverter(KinesisFirehoseSinkElementConverter 
elementConverter)__
+    * Required.
+    * Supplies a serialization schema to the output. May be built using the 
following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as 
per the example.
+3. __setDeliveryStreamName(String deliveryStreamName)__
+    * Required.
+    * Name of the delivery stream to sink to.
+4. _setFailOnError(boolean failOnError)_
+    * Optional. Default: `false`.
+    * Whether failed requests to write records to Firehose are treated as 
fatal exceptions in the sink.
+5. _setMaxBatchSize(int maxBatchSize)_
+    * Optional. Default: `500`.
+    * Maximum size of a batch to write to Firehose.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+    * Optional. Default: `50`.
+    * The maximum number of in flight requests allowed before the sink applies 
backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+    * Optional. Default: `10_000`.
+    * The maximum number of records that may be buffered in the sink before 
backpressure is applied. 
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+    * Optional. Default: `4 * 1024 * 1024`.
+    * The maximum size (in bytes) a batch may become. All batches sent will be 
smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+    * Optional. Default: `5000`.
+    * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+    * Optional. Default: `1000 * 1024`.
+    * The maximum record size that the sink will accept, records larger than 
this will be automatically rejected.
+11. _build()_
+    * Constructs and returns the Firehose sink.
+
+
+## Using Custom Firehose Endpoints
+
+It is sometimes desirable to have Flink operate as a consumer or producer 
against a Firehose VPC endpoint or a non-AWS
+Firehose endpoint such as [Localstack](https://localstack.cloud/); this is 
especially useful when performing
+functional testing of a Flink application. The AWS endpoint that would 
normally be inferred by the AWS region set in the
+Flink configuration must be overridden via a configuration property.
+
+To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and 
`AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the 
endpoint URL.
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties producerConfig = new Properties();
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566";);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val producerConfig = new Properties()
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566";)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}

Reply via email to