dannycranmer commented on a change in pull request #18669: URL: https://github.com/apache/flink/pull/18669#discussion_r802781372
########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * Class holding internal state of buffered requests. + * + * @param <RequestEntryT> request type. + */ +@PublicEvolving Review comment: Why is this `@PublicEvolving`? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * Class holding internal state of buffered requests. + * + * @param <RequestEntryT> request type. + */ +@PublicEvolving +public class BufferedRequestState<RequestEntryT extends Serializable> implements Serializable { + private final Collection<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries; + private final Long stateSize; + + public BufferedRequestState( + Collection<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries) { + this.bufferedRequestEntries = bufferedRequestEntries; Review comment: Would be better to copy the collection contents here. We care about order so might aswell upgrade to `List` ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * Class holding internal state of buffered requests. + * + * @param <RequestEntryT> request type. + */ +@PublicEvolving +public class BufferedRequestState<RequestEntryT extends Serializable> implements Serializable { + private final Collection<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries; + private final Long stateSize; Review comment: This can be `long` ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java ########## @@ -406,11 +403,30 @@ private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) { * a failure/restart of the application. */ @Override - public List<Collection<RequestEntryT>> snapshotState() { - return Arrays.asList( - bufferedRequestEntries.stream() - .map(RequestEntryWrapper::getRequestEntry) - .collect(Collectors.toList())); + public List<BufferedRequestState<RequestEntryT>> snapshotState() { + return Collections.singletonList( + new BufferedRequestState<>( + Collections.unmodifiableCollection(bufferedRequestEntries))); + } + + protected void initialize(BufferedRequestState<RequestEntryT> state) { + this.bufferedRequestEntries.clear(); + this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries()); + + long sum = 0L; + for (RequestEntryWrapper<RequestEntryT> wrapper : bufferedRequestEntries) { + if (wrapper.getSize() > maxRecordSizeInBytes) { + throw new IllegalStateException( + String.format( + "State contains record of size %d which exceeds sink maximum record size %d.", + wrapper.getSize(), maxRecordSizeInBytes)); + } + + sum = sum + wrapper.getSize(); Review comment: nit: `sum += wrapper.getSize();` ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestEntryWrapper.java ########## @@ -45,4 +47,25 @@ public RequestEntryT getRequestEntry() { public long getSize() { return size; } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + + if (other == null || other.getClass() != this.getClass()) { + return false; + } + + RequestEntryWrapper<RequestEntryT> that = (RequestEntryWrapper<RequestEntryT>) other; + + return Objects.equals(this.size, that.size) + && Objects.equals(this.requestEntry, that.requestEntry); + } + + @Override + public int hashCode() { + return Objects.hash(size, requestEntry); + } Review comment: Same question as above, why add this? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * Class holding internal state of buffered requests. + * + * @param <RequestEntryT> request type. + */ +@PublicEvolving +public class BufferedRequestState<RequestEntryT extends Serializable> implements Serializable { + private final Collection<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries; + private final Long stateSize; + + public BufferedRequestState( + Collection<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries) { + this.bufferedRequestEntries = bufferedRequestEntries; + this.stateSize = + bufferedRequestEntries.stream() + .map(RequestEntryWrapper::getSize) + .reduce(Long::sum) + .orElse(0L); + } + + public Collection<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries() { + return bufferedRequestEntries; + } + + public Long getStateSize() { Review comment: This is not used ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * Class holding internal state of buffered requests. + * + * @param <RequestEntryT> request type. + */ +@PublicEvolving +public class BufferedRequestState<RequestEntryT extends Serializable> implements Serializable { + private final Collection<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries; + private final Long stateSize; + + public BufferedRequestState( + Collection<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries) { + this.bufferedRequestEntries = bufferedRequestEntries; + this.stateSize = + bufferedRequestEntries.stream() + .map(RequestEntryWrapper::getSize) + .reduce(Long::sum) + .orElse(0L); + } + + public Collection<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries() { + return bufferedRequestEntries; + } + + public Long getStateSize() { + return stateSize; + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + + if (other == null || other.getClass() != this.getClass()) { + return false; + } + BufferedRequestState<RequestEntryT> that = (BufferedRequestState<RequestEntryT>) other; + return Arrays.equals( + this.bufferedRequestEntries.toArray(), that.bufferedRequestEntries.toArray()); + } + + @Override + public int hashCode() { + int hash = 0; + for (RequestEntryWrapper<RequestEntryT> request : bufferedRequestEntries) { + hash = Objects.hash(hash, request); + } + + return hash; + } Review comment: Why did you implement this? Flink coding standard advise against this unless really necessary https://flink.apache.org/contributing/code-style-and-quality-java.html ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * Class holding internal state of buffered requests. Review comment: Please improve this comment ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterStateSerializer.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Serializer class for {@link AsyncSinkWriter} state. + * + * @param <RequestEntryT> Writer Request Entry type + */ +@Internal +public abstract class AsyncSinkWriterStateSerializer<RequestEntryT extends Serializable> + implements SimpleVersionedSerializer<BufferedRequestState<RequestEntryT>> { + private static final long DATA_IDENTIFIER = -1; + + // Serializes state in form of + // [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2....] Review comment: Use a multiline javadoc here instead of multiple single line comments ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * Class holding internal state of buffered requests. + * + * @param <RequestEntryT> request type. + */ +@PublicEvolving +public class BufferedRequestState<RequestEntryT extends Serializable> implements Serializable { Review comment: What is the point in this class? It is simply wrapping a `List`, why not just use List? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java ########## @@ -406,11 +403,30 @@ private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) { * a failure/restart of the application. */ @Override - public List<Collection<RequestEntryT>> snapshotState() { - return Arrays.asList( - bufferedRequestEntries.stream() - .map(RequestEntryWrapper::getRequestEntry) - .collect(Collectors.toList())); + public List<BufferedRequestState<RequestEntryT>> snapshotState() { + return Collections.singletonList( + new BufferedRequestState<>( + Collections.unmodifiableCollection(bufferedRequestEntries))); + } + + protected void initialize(BufferedRequestState<RequestEntryT> state) { + this.bufferedRequestEntries.clear(); + this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries()); + + long sum = 0L; + for (RequestEntryWrapper<RequestEntryT> wrapper : bufferedRequestEntries) { + if (wrapper.getSize() > maxRecordSizeInBytes) { + throw new IllegalStateException( + String.format( + "State contains record of size %d which exceeds sink maximum record size %d.", + wrapper.getSize(), maxRecordSizeInBytes)); + } + + sum = sum + wrapper.getSize(); Review comment: Can you use `state.getStateSize()` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
