AHeise commented on a change in pull request #16796:
URL: https://github.com/apache/flink/pull/16796#discussion_r688488875
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -185,44 +190,57 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context
context) {
final SerializationSchema<RowData> valueSerialization =
createSerialization(context, valueEncodingFormat,
valueProjection, null);
+ final KafkaSinkBuilder<RowData> sinkBuilder = KafkaSink.builder();
+ final List<LogicalType> physicalChildren =
physicalDataType.getLogicalType().getChildren();
+ if (transactionalIdPrefix != null) {
+ sinkBuilder.setTransactionalIdPrefix(transactionalIdPrefix);
+ }
+ final KafkaSink<RowData> kafkaSink =
+ sinkBuilder
+ .setDeliverGuarantee(deliveryGuarantee)
+ .setBootstrapServers(
+
properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
+ .setKafkaProducerConfig(properties)
+ .setRecordSerializer(
+ new DynamicKafkaRecordSerializationSchema(
+ topic,
+ partitioner,
+ keySerialization,
+ valueSerialization,
+ getFieldGetters(physicalChildren,
keyProjection),
+ getFieldGetters(physicalChildren,
valueProjection),
+ hasMetadata(),
+ getMetadataPositions(physicalChildren),
+ upsertMode))
+ .build();
if (flushMode.isEnabled() && upsertMode) {
- BufferedUpsertSinkFunction buffedSinkFunction =
- new BufferedUpsertSinkFunction(
- createKafkaProducer(keySerialization,
valueSerialization),
- physicalDataType,
- keyProjection,
- context.createTypeInformation(consumedDataType),
- flushMode);
- return SinkFunctionProvider.of(buffedSinkFunction, parallelism);
- } else {
- final KafkaSinkBuilder<RowData> sinkBuilder = KafkaSink.builder();
- final List<LogicalType> physicalChildren =
- physicalDataType.getLogicalType().getChildren();
- if (transactionalIdPrefix != null) {
- sinkBuilder.setTransactionalIdPrefix(transactionalIdPrefix);
- }
- return SinkProvider.of(
- sinkBuilder
- .setDeliverGuarantee(deliveryGuarantee)
- .setBootstrapServers(
- properties
-
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
- .toString())
- .setKafkaProducerConfig(properties)
- .setRecordSerializer(
- new DynamicKafkaRecordSerializationSchema(
- topic,
- partitioner,
- keySerialization,
- valueSerialization,
- getFieldGetters(physicalChildren,
keyProjection),
- getFieldGetters(physicalChildren,
valueProjection),
- hasMetadata(),
-
getMetadataPositions(physicalChildren),
- upsertMode))
- .build(),
- parallelism);
+ return (DataStreamSinkProvider)
Review comment:
Do we want to add a check that noone is expecting exactly-once here?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperatorFactory.java
##########
@@ -44,7 +45,7 @@
extends AbstractStreamOperatorFactory<byte[]>
implements OneInputStreamOperatorFactory<InputT, byte[]>,
YieldingOperatorFactory<byte[]> {
- private final Sink<InputT, CommT, WriterStateT, ?> sink;
+ @VisibleForTesting public final Sink<InputT, CommT, WriterStateT, ?> sink;
Review comment:
🙈 Please add a setter for that.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class ReducingUpsertWriter<WriterState> implements SinkWriter<RowData, Void,
WriterState> {
+
+ private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+ private final WrappedContext wrappedContext = new WrappedContext();
+ private final int batchMaxRowNums;
+ private final Function<RowData, RowData> valueCopyFunction;
+ private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new
HashMap<>();
+ private final Function<RowData, RowData> keyExtractor;
+ private final Sink.ProcessingTimeService timeService;
+ private final long batchIntervalMs;
+
+ private boolean closed = false;
+ private int batchCount = 0;
Review comment:
I'd get rid of `batchCount` and just go with `reduceBuffer.size()`. It
makes a huge difference if the same key is updated over and over again.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSinkFunction.java
##########
@@ -56,7 +56,10 @@
/**
* The wrapper of the {@link RichSinkFunction}. It will buffer the data and
emit when the buffer is
* full or timer is triggered or checkpointing.
+ *
+ * @deprecated Please use {@link ReducingUpsertSink} instead.
*/
+@Deprecated
public class BufferedUpsertSinkFunction extends RichSinkFunction<RowData>
Review comment:
Didn't you say that this guy is `@Public`? I don't see an explicit
annotation and would say that it's defacto `@Internal` and can be removed.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
##########
@@ -231,10 +235,23 @@ public void testBufferedTableSink() {
// Test kafka producer.
DynamicTableSink.SinkRuntimeProvider provider =
actualUpsertKafkaSink.getSinkRuntimeProvider(new
SinkRuntimeProviderContext(false));
- assertThat(provider, instanceOf(SinkFunctionProvider.class));
- final SinkFunctionProvider sinkFunctionProvider =
(SinkFunctionProvider) provider;
- final SinkFunction<RowData> sinkFunction =
sinkFunctionProvider.createSinkFunction();
- assertThat(sinkFunction, instanceOf(BufferedUpsertSinkFunction.class));
+ assertThat(provider, instanceOf(DataStreamSinkProvider.class));
+ final DataStreamSinkProvider sinkProvider = (DataStreamSinkProvider)
provider;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ sinkProvider.consumeDataStream(env.fromElements(new BinaryRowData(1)));
+ final StreamOperatorFactory<?> sinkOperatorFactory =
+ env.getStreamGraph().getStreamNodes().stream()
+ .filter(n -> n.getOperatorName().contains("Sink"))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Expected operator with name
Sink in stream graph."))
+ .getOperatorFactory();
+ assertThat(sinkOperatorFactory, instanceOf(SinkOperatorFactory.class));
+ assertThat(
+ ((SinkOperatorFactory) sinkOperatorFactory).sink,
+ instanceOf(ReducingUpsertSink.class));
Review comment:
Much better!
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class ReducingUpsertWriter<WriterState> implements SinkWriter<RowData, Void,
WriterState> {
+
+ private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+ private final WrappedContext wrappedContext = new WrappedContext();
+ private final int batchMaxRowNums;
+ private final Function<RowData, RowData> valueCopyFunction;
+ private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new
HashMap<>();
+ private final Function<RowData, RowData> keyExtractor;
+ private final Sink.ProcessingTimeService timeService;
+ private final long batchIntervalMs;
+
+ private boolean closed = false;
+ private int batchCount = 0;
+ private long lastFlush = Long.MIN_VALUE;
+
+ ReducingUpsertWriter(
+ SinkWriter<RowData, ?, WriterState> wrappedWriter,
+ DataType physicalDataType,
+ int[] keyProjection,
+ SinkBufferFlushMode bufferFlushMode,
+ Sink.ProcessingTimeService timeService,
+ Function<RowData, RowData> valueCopyFunction) {
+ checkArgument(bufferFlushMode != null && bufferFlushMode.isEnabled());
+ this.wrappedWriter = checkNotNull(wrappedWriter);
+ this.timeService = checkNotNull(timeService);
+ registerFlush();
+ this.batchMaxRowNums = bufferFlushMode.getBatchSize();
+ this.batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
+
+ List<LogicalType> fields =
physicalDataType.getLogicalType().getChildren();
+ final RowData.FieldGetter[] keyFieldGetters =
+ Arrays.stream(keyProjection)
+ .mapToObj(
+ targetField ->
+ RowData.createFieldGetter(
+ fields.get(targetField),
targetField))
+ .toArray(RowData.FieldGetter[]::new);
+ this.keyExtractor = rowData -> createProjectedRow(rowData,
RowKind.INSERT, keyFieldGetters);
+ this.valueCopyFunction = valueCopyFunction;
+ }
+
+ @Override
+ public void write(RowData element, Context context) throws IOException,
InterruptedException {
+ wrappedContext.setContext(context);
+ addToBuffer(element, context.timestamp());
+ }
+
+ @Override
+ public List<Void> prepareCommit(boolean flush) throws IOException,
InterruptedException {
+ flush();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<WriterState> snapshotState() throws IOException {
+ return wrappedWriter.snapshotState();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closed) {
+ closed = true;
+ wrappedWriter.close();
+ }
+ }
+
+ private void addToBuffer(RowData row, Long timestamp) throws IOException,
InterruptedException {
+ RowData key = keyExtractor.apply(row);
+ RowData value = valueCopyFunction.apply(row);
+ reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp));
+ batchCount++;
+
+ if (batchCount >= batchMaxRowNums) {
+ flush();
+ }
+ }
+
+ private void registerFlush() {
+ if (closed) {
+ return;
+ }
+ timeService.registerProcessingTimer(
+ System.currentTimeMillis() + batchIntervalMs,
+ (t) -> {
+ if (t >= lastFlush + batchIntervalMs) {
+ flush();
+ }
+ registerFlush();
Review comment:
I wouldn't do that here but in `flush`. Let's say that the timer
triggers all 10s and the size-based flush happens at 1s.
Then the current way would trigger the time-based flush at 20s. I'd expect
11s.
The easiest way would be to keep your check and just move `register` to
flush. You can even reduce the `currentTimeMillis` calls to 1 then.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]