dannycranmer commented on a change in pull request #18669:
URL: https://github.com/apache/flink/pull/18669#discussion_r802570306
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
##########
@@ -126,13 +126,19 @@
getMaxRecordSizeInBytes(),
failOnError,
streamName,
- kinesisClientProperties);
+ kinesisClientProperties,
+ getInitialState(states));
+ }
+
+ private BufferedRequestState<PutRecordsRequestEntry> getInitialState(
+ List<BufferedRequestState<PutRecordsRequestEntry>> states) {
+ return states.isEmpty() ? BufferedRequestState.emptyState() :
states.get(0);
Review comment:
Why are we discarding the other items in the list here? Are we expecting
the list to be size 1? If so, can we validate that and fail if not the case?
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Kinesis Streams implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public final class KinesisDataStreamsStateSerializer
+ extends AsyncSinkWriterStateSerializer<PutRecordsRequestEntry> {
+ @Override
+ protected void serializeRequestToStream(PutRecordsRequestEntry request,
DataOutputStream out)
+ throws IOException {
+ out.write(request.data().asByteArrayUnsafe());
+ }
+
+ @Override
+ protected PutRecordsRequestEntry deserializeRequestFromStream(
+ long requestSize, DataInputStream in) throws IOException {
+ byte[] requestData = new byte[(int) requestSize];
+ in.read(requestData);
+ return
PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray(requestData)).build();
+ }
+
+ @Override
+ public int getVersion() {
+ return 0;
Review comment:
We should set this to 1, as 0 is equivalent to unset
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Kinesis Streams implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public final class KinesisDataStreamsStateSerializer
+ extends AsyncSinkWriterStateSerializer<PutRecordsRequestEntry> {
+ @Override
+ protected void serializeRequestToStream(PutRecordsRequestEntry request,
DataOutputStream out)
+ throws IOException {
+ out.write(request.data().asByteArrayUnsafe());
+ }
+
+ @Override
+ protected PutRecordsRequestEntry deserializeRequestFromStream(
+ long requestSize, DataInputStream in) throws IOException {
+ byte[] requestData = new byte[(int) requestSize];
+ in.read(requestData);
+ return
PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray(requestData)).build();
Review comment:
We are not setting the partition key here. Also, this implementation
seems like it needs to be tied to the `ElementConverter`, somehow
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -406,11 +403,30 @@ private void addEntryToBuffer(RequestEntryT entry,
boolean insertAtHead) {
* a failure/restart of the application.
*/
@Override
- public List<Collection<RequestEntryT>> snapshotState() {
- return Arrays.asList(
- bufferedRequestEntries.stream()
- .map(RequestEntryWrapper::getRequestEntry)
- .collect(Collectors.toList()));
+ public List<BufferedRequestState<RequestEntryT>> snapshotState() {
+ return Collections.singletonList(
+ new BufferedRequestState<>(
+
Collections.unmodifiableCollection(bufferedRequestEntries)));
+ }
+
+ protected void initialize(BufferedRequestState<RequestEntryT> state) {
+ this.bufferedRequestEntries.clear();
Review comment:
Do we not expect this to be clear already? We should add validation to
assert this. But based on my other feedback, if this method should only be
called during construction we should make it private
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -406,11 +403,30 @@ private void addEntryToBuffer(RequestEntryT entry,
boolean insertAtHead) {
* a failure/restart of the application.
*/
@Override
- public List<Collection<RequestEntryT>> snapshotState() {
- return Arrays.asList(
- bufferedRequestEntries.stream()
- .map(RequestEntryWrapper::getRequestEntry)
- .collect(Collectors.toList()));
+ public List<BufferedRequestState<RequestEntryT>> snapshotState() {
+ return Collections.singletonList(
+ new BufferedRequestState<>(
+
Collections.unmodifiableCollection(bufferedRequestEntries)));
+ }
+
+ protected void initialize(BufferedRequestState<RequestEntryT> state) {
+ this.bufferedRequestEntries.clear();
+ this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries());
+
+ long sum = 0L;
+ for (RequestEntryWrapper<RequestEntryT> wrapper :
bufferedRequestEntries) {
+ if (wrapper.getSize() > maxRecordSizeInBytes) {
+ throw new IllegalStateException(
Review comment:
How about other validations such as number of buffered records
(`maxBufferedRequests`)? Maybe we should allow this condition
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -97,6 +126,7 @@
this.metrics = context.metricGroup();
this.numRecordsOutErrorsCounter =
metrics.getNumRecordsOutErrorsCounter();
this.client = buildClient(kinesisClientProperties);
+ initialize(state);
Review comment:
Can this go in the super class? Seems better to pass the `state` to
`AsyncSinkWriter` and let it handle the init?
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
##########
@@ -105,11 +105,18 @@
getMaxRecordSizeInBytes(),
failOnError,
deliveryStreamName,
- firehoseClientProperties);
+ firehoseClientProperties,
+ getInitialState(states));
+ }
+
+ private BufferedRequestState<Record> getInitialState(
+ List<BufferedRequestState<Record>> states) {
+ return states.isEmpty() ? BufferedRequestState.emptyState() :
states.get(0);
Review comment:
Same question as KDS
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializer.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Kinesis Firehose implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class KinesisFirehoseStateSerializer extends
AsyncSinkWriterStateSerializer<Record> {
+ @Override
+ protected void serializeRequestToStream(Record request, DataOutputStream
out)
+ throws IOException {
+ out.write(request.data().asByteArrayUnsafe());
+ }
+
+ @Override
+ protected Record deserializeRequestFromStream(long requestSize,
DataInputStream in)
+ throws IOException {
+ byte[] requestData = new byte[(int) requestSize];
+ in.read(requestData);
+ return
Record.builder().data(SdkBytes.fromByteArray(requestData)).build();
+ }
+
+ @Override
+ public int getVersion() {
+ return 0;
Review comment:
We should set this to 1, as 0 is equivalent to unset
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Kinesis Streams implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public final class KinesisDataStreamsStateSerializer
Review comment:
I cannot see any unit tests to cover the SerDe
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializer.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Kinesis Firehose implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class KinesisFirehoseStateSerializer extends
AsyncSinkWriterStateSerializer<Record> {
Review comment:
I cannot see any unit tests to cover the SerDe
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
##########
@@ -118,6 +147,7 @@ private static FirehoseAsyncClient createFirehoseClient(
this.numRecordsOutErrorsCounter =
metrics.getNumRecordsOutErrorsCounter();
this.httpClient = createHttpClient(firehoseClientProperties);
this.firehoseClient = createFirehoseClient(firehoseClientProperties,
httpClient);
+ initialize(initialState);
Review comment:
Same as for KDS
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Kinesis Streams implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public final class KinesisDataStreamsStateSerializer
+ extends AsyncSinkWriterStateSerializer<PutRecordsRequestEntry> {
+ @Override
+ protected void serializeRequestToStream(PutRecordsRequestEntry request,
DataOutputStream out)
+ throws IOException {
+ out.write(request.data().asByteArrayUnsafe());
Review comment:
Instead of just ripping out the data, can we serialise the whole object
instead?
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -406,11 +403,30 @@ private void addEntryToBuffer(RequestEntryT entry,
boolean insertAtHead) {
* a failure/restart of the application.
*/
@Override
- public List<Collection<RequestEntryT>> snapshotState() {
- return Arrays.asList(
- bufferedRequestEntries.stream()
- .map(RequestEntryWrapper::getRequestEntry)
- .collect(Collectors.toList()));
+ public List<BufferedRequestState<RequestEntryT>> snapshotState() {
+ return Collections.singletonList(
+ new BufferedRequestState<>(
+
Collections.unmodifiableCollection(bufferedRequestEntries)));
+ }
+
+ protected void initialize(BufferedRequestState<RequestEntryT> state) {
+ this.bufferedRequestEntries.clear();
+ this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries());
+
+ long sum = 0L;
+ for (RequestEntryWrapper<RequestEntryT> wrapper :
bufferedRequestEntries) {
+ if (wrapper.getSize() > maxRecordSizeInBytes) {
+ throw new IllegalStateException(
+ String.format(
+ "State contains record of size %d which
exceeds sink maximum record size %d.",
+ wrapper.getSize(), maxRecordSizeInBytes));
+ }
+
+ sum = sum + wrapper.getSize();
+ }
+
+ this.bufferedRequestEntriesTotalSizeInBytes = sum;
+ flushIfAble();
Review comment:
Is this the right time to flush? Would this state restore be called
during operator `initializing` state? If so, we probably do not want to flush
here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]