CrynetLogistics commented on a change in pull request #17907:
URL: https://github.com/apache/flink/pull/17907#discussion_r778099668
##########
File path: docs/content/docs/connectors/table/kinesis.md
##########
@@ -629,12 +636,71 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td></td>
<td>
- Sink options for the <code>KinesisProducer</code>.
- Suffix names must match the <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">KinesisProducerConfiguration</a>
setters in lower-case hyphenated style (for example,
<code>sink.producer.collection-max-count</code> or
<code>sink.producer.aggregation-max-count</code>).
- The transformed action keys are passed to the
<code>sink.producer.*</code> to <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-">KinesisProducerConfigurations#fromProperties</a>.
- Note that some of the defaults are overwritten by
<code>KinesisConfigUtil</code>.
+ Deprecated options previously used by <code>KinesisProducer</code>.
+ Options with equivalant alternatives in
<code>KinesisAsyncClient</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
</td>
</tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.batch.max-size</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">500</td>
+ <td>Integer</td>
+ <td>Maximum batch size of elements to be passed to
<code>KinesisAsyncClient</code>to be written downstream.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-inflight</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>Request threshold for uncompleted requests by
<code>KinesisAsyncClient</code>before blocking new write requests.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-buffered</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>String</td>
+ <td>request buffer threshold by <code>KinesisAsyncClient</code>before
blocking new write requests.</td>
Review comment:
Same as above. The render comes out to:

##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams-table/src/main/java/org/apache/flink/connectors/kinesis/table/KinesisDynamicSink.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kinesis.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
+import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink;
+import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkBuilder;
+import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter;
+import
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.PartitionKeyGenerator;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+/** Kinesis-backed {@link AsyncDynamicTableSink}. */
+@Internal
+public class KinesisDynamicSink extends
AsyncDynamicTableSink<PutRecordsRequestEntry>
+ implements SupportsPartitioning {
+
+ /** Consumed data type of the table. */
+ private final DataType consumedDataType;
+
+ /** The Kinesis stream to write to. */
+ private final String stream;
+
+ /** Properties for the Kinesis DataStream Sink. */
+ private final Properties kinesisClientProperties;
+
+ /** Sink format for encoding records to Kinesis. */
+ private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
+
+ /** Partitioner to select Kinesis partition for each item. */
+ private final PartitionKeyGenerator<RowData> partitioner;
+
+ private final Boolean failOnError;
+
+ public KinesisDynamicSink(
+ @Nullable Integer maxBatchSize,
+ @Nullable Integer maxInFlightRequests,
+ @Nullable Integer maxBufferedRequests,
+ @Nullable Long maxBufferSizeInBytes,
+ @Nullable Long maxTimeInBufferMS,
+ @Nullable Boolean failOnError,
+ @Nullable DataType consumedDataType,
+ String stream,
+ @Nullable Properties kinesisClientProperties,
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ PartitionKeyGenerator<RowData> partitioner) {
+ super(
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBufferSizeInBytes,
+ maxTimeInBufferMS);
+ this.failOnError = failOnError;
+ this.kinesisClientProperties = kinesisClientProperties;
+ this.consumedDataType =
+ Preconditions.checkNotNull(consumedDataType, "Consumed data
type must not be null");
+ this.stream = Preconditions.checkNotNull(stream, "Kinesis stream name
must not be null");
+ this.encodingFormat =
+ Preconditions.checkNotNull(encodingFormat, "Encoding format
must not be null");
+ this.partitioner =
+ Preconditions.checkNotNull(
+ partitioner, "Kinesis partition key generator must not
be null");
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return encodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ SerializationSchema<RowData> serializationSchema =
+ encodingFormat.createRuntimeEncoder(context, consumedDataType);
+ ElementConverter<RowData, PutRecordsRequestEntry> elementConverter =
+ KinesisDataStreamsSinkElementConverter.<RowData>builder()
+ .setSerializationSchema(serializationSchema)
+ .setPartitionKeyGenerator(partitioner)
+ .build();
+
+ KinesisDataStreamsSinkBuilder<RowData> builder =
+ KinesisDataStreamsSink.<RowData>builder()
+ .setElementConverter(elementConverter)
+ .setKinesisClientProperties(kinesisClientProperties)
+ .setStreamName(stream);
+
+ Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError);
+ Optional.ofNullable(maxBatchSize).ifPresent(builder::setMaxBatchSize);
+
Optional.ofNullable(maxBufferSizeInBytes).ifPresent(builder::setMaxBatchSizeInBytes);
+
Optional.ofNullable(maxInFlightRequests).ifPresent(builder::setMaxInFlightRequests);
+
Optional.ofNullable(maxBufferedRequests).ifPresent(builder::setMaxBufferedRequests);
+
Optional.ofNullable(maxTimeInBufferMS).ifPresent(builder::setMaxTimeInBufferMS);
+
+ KinesisDataStreamsSink<RowData> kdsSink = builder.build();
+ return SinkProvider.of(kdsSink);
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new KinesisDynamicSink(
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBufferSizeInBytes,
+ maxTimeInBufferMS,
+ failOnError,
+ consumedDataType,
+ stream,
+ kinesisClientProperties,
+ encodingFormat,
+ partitioner);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Kinesis";
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // SupportsPartitioning
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public void applyStaticPartition(Map<String, String> partition) {
+ if (partitioner instanceof RowDataFieldsKinesisKeyGenerator) {
+ ((RowDataFieldsKinesisKeyGenerator)
partitioner).setStaticFields(partition);
+ } else {
+ String msg =
+ ""
+ + "Cannot apply static partition optimization to a
partition class "
+ + "that does not inherit from "
+ +
"org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesisPartitioner.";
+ throw new RuntimeException(msg);
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Value semantics for equals and hashCode
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ KinesisDynamicSink that = (KinesisDynamicSink) o;
+ return super.equals(o)
+ && Objects.equals(consumedDataType, that.consumedDataType)
+ && Objects.equals(stream, that.stream)
+ && Objects.equals(kinesisClientProperties,
that.kinesisClientProperties)
+ && Objects.equals(encodingFormat, that.encodingFormat)
+ && Objects.equals(partitioner, that.partitioner)
+ && Objects.equals(failOnError, that.failOnError);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(),
+ consumedDataType,
+ stream,
+ kinesisClientProperties,
+ encodingFormat,
+ partitioner,
+ failOnError);
+ }
+
+ /** Builder class for {@link KinesisDynamicSink}. */
+ @Internal
+ public static class KinesisDynamicTableSinkBuilder
+ extends AsyncDynamicTableSinkBuilder<
+ PutRecordsRequestEntry, KinesisDynamicTableSinkBuilder> {
+
+ private DataType consumedDataType = null;
+ private String stream = null;
+ private Properties kinesisClientProperties = null;
+ private EncodingFormat<SerializationSchema<RowData>> encodingFormat =
null;
+ private PartitionKeyGenerator<RowData> partitioner = null;
+ private Boolean failOnError = null;
+
+ public KinesisDynamicTableSinkBuilder setConsumedDataType(DataType
consumedDataType) {
+ this.consumedDataType = consumedDataType;
+ return this;
+ }
+
+ public KinesisDynamicTableSinkBuilder setStream(String stream) {
+ this.stream = stream;
+ return this;
+ }
+
+ public KinesisDynamicTableSinkBuilder setKinesisClientProperties(
+ Properties kinesisClientProperties) {
+ this.kinesisClientProperties = kinesisClientProperties;
+ return this;
+ }
+
+ public KinesisDynamicTableSinkBuilder setEncodingFormat(
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ this.encodingFormat = encodingFormat;
+ return this;
+ }
+
+ public KinesisDynamicTableSinkBuilder setFailOnError(Boolean
failOnError) {
+ this.failOnError = failOnError;
+ return this;
+ }
+
+ public KinesisDynamicTableSinkBuilder setPartitioner(
+ PartitionKeyGenerator<RowData> partitioner) {
+ this.partitioner = partitioner;
+ return this;
+ }
+
+ @Override
+ public AsyncDynamicTableSink<PutRecordsRequestEntry> build() {
Review comment:
Could this return a `KinesisDynamicSink`?
##########
File path: docs/content/docs/connectors/table/kinesis.md
##########
@@ -629,12 +636,71 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td></td>
<td>
- Sink options for the <code>KinesisProducer</code>.
- Suffix names must match the <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">KinesisProducerConfiguration</a>
setters in lower-case hyphenated style (for example,
<code>sink.producer.collection-max-count</code> or
<code>sink.producer.aggregation-max-count</code>).
- The transformed action keys are passed to the
<code>sink.producer.*</code> to <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-">KinesisProducerConfigurations#fromProperties</a>.
- Note that some of the defaults are overwritten by
<code>KinesisConfigUtil</code>.
+ Deprecated options previously used by <code>KinesisProducer</code>.
+ Options with equivalant alternatives in
<code>KinesisAsyncClient</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
</td>
</tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.batch.max-size</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">500</td>
+ <td>Integer</td>
+ <td>Maximum batch size of elements to be passed to
<code>KinesisAsyncClient</code>to be written downstream.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-inflight</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>Request threshold for uncompleted requests by
<code>KinesisAsyncClient</code>before blocking new write requests.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-buffered</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>String</td>
+ <td>request buffer threshold by <code>KinesisAsyncClient</code>before
blocking new write requests.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.size</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">5242880</td>
+ <td>Long</td>
+ <td>Threshold value in bytes for writer buffer in
<code>KinesisAsyncClient</code>before flushing.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">5000</td>
+ <td>Long</td>
+ <td>Threshold time for an element to be in a buffer
of<code>KinesisAsyncClient</code>before flushing.</td>
Review comment:
I think you would need to add a space between the `of` and the `<code>`.
Here and a few other places...
##########
File path:
flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/filter-large-orders.sql
##########
@@ -36,10 +36,10 @@ CREATE TABLE large_orders (
'connector' = 'kinesis',
'stream' = 'large_orders',
'aws.region' = 'us-east-1',
- 'sink.producer.verify-certificate' = 'false',
- 'sink.producer.kinesis-port' = '4567',
- 'sink.producer.kinesis-endpoint' = 'kinesalite',
- 'sink.producer.aggregation-enabled' = 'false',
Review comment:
Would it be better to have a new test case on the new Table API sink in
a separate file say `filter-large-orders-new.sql` and have a second test in
`KinesisTableApiITCase` to test the new one so we keep the test on the old sink
while it is deprecated?
##########
File path: docs/content/docs/connectors/table/kinesis.md
##########
@@ -629,12 +636,71 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td></td>
<td>
- Sink options for the <code>KinesisProducer</code>.
- Suffix names must match the <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">KinesisProducerConfiguration</a>
setters in lower-case hyphenated style (for example,
<code>sink.producer.collection-max-count</code> or
<code>sink.producer.aggregation-max-count</code>).
- The transformed action keys are passed to the
<code>sink.producer.*</code> to <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-">KinesisProducerConfigurations#fromProperties</a>.
- Note that some of the defaults are overwritten by
<code>KinesisConfigUtil</code>.
+ Deprecated options previously used by <code>KinesisProducer</code>.
+ Options with equivalant alternatives in
<code>KinesisAsyncClient</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
</td>
</tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.batch.max-size</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">500</td>
+ <td>Integer</td>
+ <td>Maximum batch size of elements to be passed to
<code>KinesisAsyncClient</code>to be written downstream.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-inflight</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>Request threshold for uncompleted requests by
<code>KinesisAsyncClient</code>before blocking new write requests.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-buffered</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>String</td>
+ <td>request buffer threshold by <code>KinesisAsyncClient</code>before
blocking new write requests.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.size</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">5242880</td>
+ <td>Long</td>
+ <td>Threshold value in bytes for writer buffer in
<code>KinesisAsyncClient</code>before flushing.</td>
Review comment:
Same as the above issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]