wuchong commented on a change in pull request #15434: URL: https://github.com/apache/flink/pull/15434#discussion_r604573177
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + // init variable + reduceBuffer = new HashMap<>(); + wrappedContext = new WrappedContext(); + closed = false; + + // create keyExtractor and value copier + List<LogicalType> fields = physicalDataType.getLogicalType().getChildren(); + final RowData.FieldGetter[] keyFieldGetters = + Arrays.stream(keyProjection) + .mapToObj( + targetField -> + RowData.createFieldGetter( + fields.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + this.keyExtractor = rowData -> createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters); + this.valueCopier = + rowData -> { + GenericRowData copiedRowData = + new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < rowData.getArity(); i++) { + copiedRowData.setField(i, ((GenericRowData) rowData).getField(i)); + } + return copiedRowData; + }; + + // register timer + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("upsert-kafka-sink-function")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (BufferedUpsertKafkaSinkFunction.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + batchIntervalMs.toMillis(), + batchIntervalMs.toMillis(), + TimeUnit.MILLISECONDS); + + producer.open(parameters); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + producer.setRuntimeContext(t); + } + + @Override + public RuntimeContext getRuntimeContext() { + return producer.getRuntimeContext(); + } + + @Override + public void invoke(RowData value, Context context) throws Exception { + wrappedContext.setContext(context); + addToBuffer(value, context.timestamp()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (producer instanceof CheckpointListener) { + ((CheckpointListener) producer).notifyCheckpointComplete(checkpointId); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + flush(); + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).snapshotState(context); + } + } + + @Override + public synchronized void close() throws Exception { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to kafka failed.", e); + throw new RuntimeException("Writing records to kafka failed.", e); + } + } + + producer.close(); + } + super.close(); + checkFlushException(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).initializeState(context); + } + } + + // -------------------------------------------------------------------------------------------- + + private synchronized void addToBuffer(RowData row, Long timestamp) throws Exception { + if (batchCount >= batchMaxRowNums) { + flush(); + } + writeRecord(row, timestamp); + } + + /** Flush the data into the inner sink function and send the data into the sink. */ + private synchronized void writeRecord(RowData row, Long timestamp) { + System.out.println(row); Review comment: remove print. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> Review comment: Can be more generic, e.g. `BufferedUpsertSinkFunction` and please add `serialVersionUID`. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; Review comment: Add checks they must greater than zero. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + // init variable + reduceBuffer = new HashMap<>(); + wrappedContext = new WrappedContext(); + closed = false; + + // create keyExtractor and value copier + List<LogicalType> fields = physicalDataType.getLogicalType().getChildren(); + final RowData.FieldGetter[] keyFieldGetters = + Arrays.stream(keyProjection) + .mapToObj( + targetField -> + RowData.createFieldGetter( + fields.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + this.keyExtractor = rowData -> createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters); + this.valueCopier = + rowData -> { + GenericRowData copiedRowData = + new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < rowData.getArity(); i++) { + copiedRowData.setField(i, ((GenericRowData) rowData).getField(i)); + } + return copiedRowData; + }; + + // register timer + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("upsert-kafka-sink-function")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (BufferedUpsertKafkaSinkFunction.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + batchIntervalMs.toMillis(), + batchIntervalMs.toMillis(), + TimeUnit.MILLISECONDS); + + producer.open(parameters); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + producer.setRuntimeContext(t); + } + + @Override + public RuntimeContext getRuntimeContext() { + return producer.getRuntimeContext(); + } + + @Override + public void invoke(RowData value, Context context) throws Exception { + wrappedContext.setContext(context); + addToBuffer(value, context.timestamp()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (producer instanceof CheckpointListener) { + ((CheckpointListener) producer).notifyCheckpointComplete(checkpointId); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + flush(); + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).snapshotState(context); + } + } + + @Override + public synchronized void close() throws Exception { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to kafka failed.", e); + throw new RuntimeException("Writing records to kafka failed.", e); + } + } + + producer.close(); + } + super.close(); + checkFlushException(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).initializeState(context); + } + } + + // -------------------------------------------------------------------------------------------- + + private synchronized void addToBuffer(RowData row, Long timestamp) throws Exception { + if (batchCount >= batchMaxRowNums) { + flush(); + } + writeRecord(row, timestamp); + } + + /** Flush the data into the inner sink function and send the data into the sink. */ + private synchronized void writeRecord(RowData row, Long timestamp) { Review comment: call `checkFlushException` at the beginning of this method. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + // init variable + reduceBuffer = new HashMap<>(); + wrappedContext = new WrappedContext(); + closed = false; + + // create keyExtractor and value copier + List<LogicalType> fields = physicalDataType.getLogicalType().getChildren(); + final RowData.FieldGetter[] keyFieldGetters = + Arrays.stream(keyProjection) + .mapToObj( + targetField -> + RowData.createFieldGetter( + fields.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + this.keyExtractor = rowData -> createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters); + this.valueCopier = + rowData -> { + GenericRowData copiedRowData = + new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < rowData.getArity(); i++) { + copiedRowData.setField(i, ((GenericRowData) rowData).getField(i)); + } + return copiedRowData; + }; + + // register timer + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("upsert-kafka-sink-function")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (BufferedUpsertKafkaSinkFunction.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + batchIntervalMs.toMillis(), + batchIntervalMs.toMillis(), + TimeUnit.MILLISECONDS); + + producer.open(parameters); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + producer.setRuntimeContext(t); + } + + @Override + public RuntimeContext getRuntimeContext() { + return producer.getRuntimeContext(); + } + + @Override + public void invoke(RowData value, Context context) throws Exception { + wrappedContext.setContext(context); + addToBuffer(value, context.timestamp()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (producer instanceof CheckpointListener) { + ((CheckpointListener) producer).notifyCheckpointComplete(checkpointId); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + flush(); + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).snapshotState(context); + } + } + + @Override + public synchronized void close() throws Exception { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to kafka failed.", e); + throw new RuntimeException("Writing records to kafka failed.", e); + } + } + + producer.close(); + } + super.close(); + checkFlushException(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).initializeState(context); + } + } + + // -------------------------------------------------------------------------------------------- + + private synchronized void addToBuffer(RowData row, Long timestamp) throws Exception { Review comment: don't need `synchronized` for this method. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + // init variable + reduceBuffer = new HashMap<>(); + wrappedContext = new WrappedContext(); + closed = false; + + // create keyExtractor and value copier + List<LogicalType> fields = physicalDataType.getLogicalType().getChildren(); + final RowData.FieldGetter[] keyFieldGetters = + Arrays.stream(keyProjection) + .mapToObj( + targetField -> + RowData.createFieldGetter( + fields.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + this.keyExtractor = rowData -> createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters); + this.valueCopier = + rowData -> { + GenericRowData copiedRowData = + new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < rowData.getArity(); i++) { + copiedRowData.setField(i, ((GenericRowData) rowData).getField(i)); + } + return copiedRowData; + }; Review comment: This is not safe, the BinaryStringData may also be reused. Should use `TypeSerializer` to copy records. And we don't need to copy records if object reuse not enabled. You can use `getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()` to know whether object reuse is enabled in `open()` method. Then use `Function.identity` as the `valueCopier`. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; Review comment: use primitive int to make sure it's not null. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; Review comment: `batchIntervalMs` -> `batchInterval` ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + // init variable + reduceBuffer = new HashMap<>(); + wrappedContext = new WrappedContext(); + closed = false; + + // create keyExtractor and value copier + List<LogicalType> fields = physicalDataType.getLogicalType().getChildren(); + final RowData.FieldGetter[] keyFieldGetters = + Arrays.stream(keyProjection) + .mapToObj( + targetField -> + RowData.createFieldGetter( + fields.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + this.keyExtractor = rowData -> createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters); + this.valueCopier = + rowData -> { + GenericRowData copiedRowData = + new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < rowData.getArity(); i++) { + copiedRowData.setField(i, ((GenericRowData) rowData).getField(i)); + } + return copiedRowData; + }; + + // register timer + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("upsert-kafka-sink-function")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (BufferedUpsertKafkaSinkFunction.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + batchIntervalMs.toMillis(), + batchIntervalMs.toMillis(), + TimeUnit.MILLISECONDS); + + producer.open(parameters); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + producer.setRuntimeContext(t); + } + + @Override + public RuntimeContext getRuntimeContext() { + return producer.getRuntimeContext(); + } + + @Override + public void invoke(RowData value, Context context) throws Exception { + wrappedContext.setContext(context); + addToBuffer(value, context.timestamp()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (producer instanceof CheckpointListener) { + ((CheckpointListener) producer).notifyCheckpointComplete(checkpointId); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + flush(); + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).snapshotState(context); + } + } + + @Override + public synchronized void close() throws Exception { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to kafka failed.", e); + throw new RuntimeException("Writing records to kafka failed.", e); + } + } + + producer.close(); + } + super.close(); + checkFlushException(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).initializeState(context); + } + } + + // -------------------------------------------------------------------------------------------- + + private synchronized void addToBuffer(RowData row, Long timestamp) throws Exception { + if (batchCount >= batchMaxRowNums) { + flush(); + } + writeRecord(row, timestamp); Review comment: We can combine `addToBuffer` and `writeRecord` into one method, and please call flulsh check after `writeRecord` to make sure buffers are flushed when reaching batch max num. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + // init variable + reduceBuffer = new HashMap<>(); + wrappedContext = new WrappedContext(); + closed = false; + + // create keyExtractor and value copier + List<LogicalType> fields = physicalDataType.getLogicalType().getChildren(); + final RowData.FieldGetter[] keyFieldGetters = + Arrays.stream(keyProjection) + .mapToObj( + targetField -> + RowData.createFieldGetter( + fields.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + this.keyExtractor = rowData -> createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters); + this.valueCopier = + rowData -> { + GenericRowData copiedRowData = + new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < rowData.getArity(); i++) { + copiedRowData.setField(i, ((GenericRowData) rowData).getField(i)); + } + return copiedRowData; + }; + + // register timer + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("upsert-kafka-sink-function")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (BufferedUpsertKafkaSinkFunction.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + batchIntervalMs.toMillis(), + batchIntervalMs.toMillis(), + TimeUnit.MILLISECONDS); + + producer.open(parameters); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + producer.setRuntimeContext(t); + } + + @Override + public RuntimeContext getRuntimeContext() { + return producer.getRuntimeContext(); + } + + @Override + public void invoke(RowData value, Context context) throws Exception { + wrappedContext.setContext(context); + addToBuffer(value, context.timestamp()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { Review comment: Should also override `notifyCheckpointAborted`. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + // init variable + reduceBuffer = new HashMap<>(); + wrappedContext = new WrappedContext(); + closed = false; + + // create keyExtractor and value copier + List<LogicalType> fields = physicalDataType.getLogicalType().getChildren(); + final RowData.FieldGetter[] keyFieldGetters = + Arrays.stream(keyProjection) + .mapToObj( + targetField -> + RowData.createFieldGetter( + fields.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + this.keyExtractor = rowData -> createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters); + this.valueCopier = + rowData -> { + GenericRowData copiedRowData = + new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < rowData.getArity(); i++) { + copiedRowData.setField(i, ((GenericRowData) rowData).getField(i)); + } + return copiedRowData; + }; + + // register timer + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("upsert-kafka-sink-function")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (BufferedUpsertKafkaSinkFunction.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + batchIntervalMs.toMillis(), + batchIntervalMs.toMillis(), + TimeUnit.MILLISECONDS); + + producer.open(parameters); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + producer.setRuntimeContext(t); + } + + @Override + public RuntimeContext getRuntimeContext() { + return producer.getRuntimeContext(); + } + + @Override + public void invoke(RowData value, Context context) throws Exception { + wrappedContext.setContext(context); + addToBuffer(value, context.timestamp()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (producer instanceof CheckpointListener) { + ((CheckpointListener) producer).notifyCheckpointComplete(checkpointId); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + flush(); + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).snapshotState(context); + } + } + + @Override + public synchronized void close() throws Exception { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to kafka failed.", e); + throw new RuntimeException("Writing records to kafka failed.", e); + } + } + + producer.close(); + } + super.close(); + checkFlushException(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { Review comment: Move this method besides `snapshotState`. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java ########## @@ -242,6 +248,7 @@ public int hashCode() { partitioner, semantic, upsertMode, + sinkFunctionCreator, Review comment: This breaks the semantic of `hashCode` and `equals`, i.e. equal objects should have same hash code. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java ########## @@ -155,6 +161,20 @@ public DynamicTableSink createDynamicTableSink(Context context) { Integer parallelism = tableOptions.get(FactoryUtil.SINK_PARALLELISM); + KafkaDynamicSink.SinkFunctionProviderCreator creator; + Duration interval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL); + Integer batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS); + if (batchSize > 0 && interval.toMillis() > 0) { Review comment: We should enable buffering when one of them is larger than zero. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java ########## @@ -155,6 +161,20 @@ public DynamicTableSink createDynamicTableSink(Context context) { Integer parallelism = tableOptions.get(FactoryUtil.SINK_PARALLELISM); + KafkaDynamicSink.SinkFunctionProviderCreator creator; + Duration interval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL); + Integer batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS); + if (batchSize > 0 && interval.toMillis() > 0) { + creator = + BufferedUpsertKafkaSinkFunction.createBufferedSinkFunction( + schema.toPhysicalRowDataType(), + keyValueProjections.f0, + batchSize, + interval); Review comment: Personally, I think all the runtime function initialization should happen in `KafkaDynamicSink#getSinkRuntimeProvider`. A DynamicTableSink should only hold logic information, e.g. `batchSize` and `flushInterval`. Holding the `SinkFunctionProviderCreator` is hard to do `equals` and `hashcode`. This can avoid us introducing `SinkFunctionProviderCreator` and make sure we use the correct consistent physical DataType. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + // init variable + reduceBuffer = new HashMap<>(); + wrappedContext = new WrappedContext(); + closed = false; + + // create keyExtractor and value copier + List<LogicalType> fields = physicalDataType.getLogicalType().getChildren(); + final RowData.FieldGetter[] keyFieldGetters = + Arrays.stream(keyProjection) + .mapToObj( + targetField -> + RowData.createFieldGetter( + fields.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + this.keyExtractor = rowData -> createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters); + this.valueCopier = + rowData -> { + GenericRowData copiedRowData = + new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < rowData.getArity(); i++) { + copiedRowData.setField(i, ((GenericRowData) rowData).getField(i)); + } + return copiedRowData; + }; + + // register timer + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("upsert-kafka-sink-function")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (BufferedUpsertKafkaSinkFunction.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + batchIntervalMs.toMillis(), + batchIntervalMs.toMillis(), + TimeUnit.MILLISECONDS); + + producer.open(parameters); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + producer.setRuntimeContext(t); + } + + @Override + public RuntimeContext getRuntimeContext() { + return producer.getRuntimeContext(); + } + + @Override + public void invoke(RowData value, Context context) throws Exception { + wrappedContext.setContext(context); + addToBuffer(value, context.timestamp()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (producer instanceof CheckpointListener) { + ((CheckpointListener) producer).notifyCheckpointComplete(checkpointId); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + flush(); + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).snapshotState(context); + } + } + + @Override + public synchronized void close() throws Exception { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to kafka failed.", e); + throw new RuntimeException("Writing records to kafka failed.", e); + } + } + + producer.close(); + } + super.close(); + checkFlushException(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).initializeState(context); + } + } + + // -------------------------------------------------------------------------------------------- + + private synchronized void addToBuffer(RowData row, Long timestamp) throws Exception { + if (batchCount >= batchMaxRowNums) { + flush(); + } + writeRecord(row, timestamp); + } + + /** Flush the data into the inner sink function and send the data into the sink. */ + private synchronized void writeRecord(RowData row, Long timestamp) { + System.out.println(row); + RowData key = keyExtractor.apply(row); + RowData value = valueCopier.apply(row); + reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp)); + batchCount++; + } + + private synchronized void flush() throws Exception { + for (Tuple2<RowData, Long> value : reduceBuffer.values()) { + wrappedContext.setTimestamp(value.f1); + System.out.println(value.f0); Review comment: remove. ########## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunctionTest.java ########## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; + +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; +import static org.junit.Assert.assertEquals; + +/** Test for {@link BufferedUpsertKafkaSinkFunction}. */ +public class BufferedUpsertKafkaSinkFunctionTest { + + private static final ResolvedSchema SCHEMA = + ResolvedSchema.of( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("title", DataTypes.STRING().notNull()), + Column.physical("author", DataTypes.STRING()), + Column.physical("price", DataTypes.DOUBLE()), + Column.physical("qty", DataTypes.INT()), + Column.physical("ts", DataTypes.TIMESTAMP(3))); + + private static final int keyIndices = 0; + private static final int TIMESTAMP_INDICES = 5; + private static final int BATCH_SIZE = 4; + private static final Duration FLUSH_INTERVAL = Duration.ofMillis(60_000); + + public static final RowData[] TEST_DATA = { + GenericRowData.ofKind( + INSERT, + 1001, + StringData.fromString("Java public for dummies"), + StringData.fromString("Tan Ah Teck"), + 11.11, + 11, + TimestampData.fromInstant(Instant.parse("2021-03-30T15:00:00Z"))), + GenericRowData.ofKind( + INSERT, + 1002, + StringData.fromString("More Java for dummies"), + StringData.fromString("Tan Ah Teck"), + 22.22, + 22, + TimestampData.fromInstant(Instant.parse("2021-03-30T16:00:00Z"))), + GenericRowData.ofKind( + INSERT, + 1004, + StringData.fromString("A Cup of Java"), + StringData.fromString("Kumar"), + 44.44, + 44, + TimestampData.fromInstant(Instant.parse("2021-03-30T17:00:00Z"))), + GenericRowData.ofKind( + UPDATE_AFTER, + 1004, + StringData.fromString("A Teaspoon of Java"), + StringData.fromString("Kevin Jones"), + 55.55, + 55, + TimestampData.fromInstant(Instant.parse("2021-03-30T18:00:00Z"))), + GenericRowData.ofKind( + UPDATE_AFTER, + 1004, + StringData.fromString("A Teaspoon of Java 1.4"), + StringData.fromString("Kevin Jones"), + 66.66, + 66, + TimestampData.fromInstant(Instant.parse("2021-03-30T19:00:00Z"))), + GenericRowData.ofKind( + UPDATE_AFTER, + 1004, + StringData.fromString("A Teaspoon of Java 1.5"), + StringData.fromString("Kevin Jones"), + 77.77, + 77, + TimestampData.fromInstant(Instant.parse("2021-03-30T20:00:00Z"))), + GenericRowData.ofKind( + DELETE, + 1004, + StringData.fromString("A Teaspoon of Java 1.8"), + StringData.fromString("Kevin Jones"), + null, + 1010, + TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z"))) + }; + + private static KafkaDynamicSink.SinkFunctionProviderCreator creator = + BufferedUpsertKafkaSinkFunction.createBufferedSinkFunction( + SCHEMA.toSinkRowDataType(), new int[] {keyIndices}, BATCH_SIZE, FLUSH_INTERVAL); Review comment: Please disable the FLUSH_INTERVAL, otherwise test is not stable. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * The wrapper of the {@link RichSinkFunction}. It will buffer the data and emit when the buffer is + * full or timer is triggered or checkpointing. + */ +public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class); + + // -------------------------------------------------------------------------------------------- + // Config + // -------------------------------------------------------------------------------------------- + + private final RichSinkFunction<RowData> producer; + private final Integer batchMaxRowNums; + private final Duration batchIntervalMs; + private final DataType physicalDataType; + private final int[] keyProjection; + private boolean closed; + + // -------------------------------------------------------------------------------------------- + // Writer and buffer + // -------------------------------------------------------------------------------------------- + + private int batchCount = 0; + private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer; + private transient WrappedContext wrappedContext; + private transient Function<RowData, RowData> keyExtractor; + private transient Function<RowData, RowData> valueCopier; + + // -------------------------------------------------------------------------------------------- + // Timer attributes + // -------------------------------------------------------------------------------------------- + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + + public static KafkaDynamicSink.SinkFunctionProviderCreator createBufferedSinkFunction( + DataType dataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + return (producer, parallelism) -> + SinkFunctionProvider.of( + new BufferedUpsertKafkaSinkFunction( + producer, + dataType, + keyProjection, + batchMaxRowNums, + batchIntervalMs), + parallelism); + } + + private BufferedUpsertKafkaSinkFunction( + RichSinkFunction<RowData> producer, + DataType physicalDataType, + int[] keyProjection, + Integer batchMaxRowNums, + Duration batchIntervalMs) { + this.producer = producer; + this.physicalDataType = physicalDataType; + this.keyProjection = keyProjection; + this.batchMaxRowNums = batchMaxRowNums; + this.batchIntervalMs = batchIntervalMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + // init variable + reduceBuffer = new HashMap<>(); + wrappedContext = new WrappedContext(); + closed = false; + + // create keyExtractor and value copier + List<LogicalType> fields = physicalDataType.getLogicalType().getChildren(); + final RowData.FieldGetter[] keyFieldGetters = + Arrays.stream(keyProjection) + .mapToObj( + targetField -> + RowData.createFieldGetter( + fields.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + this.keyExtractor = rowData -> createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters); + this.valueCopier = + rowData -> { + GenericRowData copiedRowData = + new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < rowData.getArity(); i++) { + copiedRowData.setField(i, ((GenericRowData) rowData).getField(i)); + } + return copiedRowData; + }; + + // register timer + this.scheduler = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("upsert-kafka-sink-function")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (BufferedUpsertKafkaSinkFunction.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, + batchIntervalMs.toMillis(), + batchIntervalMs.toMillis(), + TimeUnit.MILLISECONDS); + + producer.open(parameters); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + producer.setRuntimeContext(t); + } + + @Override + public RuntimeContext getRuntimeContext() { + return producer.getRuntimeContext(); + } + + @Override + public void invoke(RowData value, Context context) throws Exception { + wrappedContext.setContext(context); + addToBuffer(value, context.timestamp()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (producer instanceof CheckpointListener) { + ((CheckpointListener) producer).notifyCheckpointComplete(checkpointId); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + flush(); + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).snapshotState(context); + } + } + + @Override + public synchronized void close() throws Exception { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to kafka failed.", e); + throw new RuntimeException("Writing records to kafka failed.", e); + } + } + + producer.close(); + } + super.close(); + checkFlushException(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (producer instanceof CheckpointedFunction) { + ((CheckpointedFunction) producer).initializeState(context); + } + } + + // -------------------------------------------------------------------------------------------- + + private synchronized void addToBuffer(RowData row, Long timestamp) throws Exception { + if (batchCount >= batchMaxRowNums) { + flush(); + } + writeRecord(row, timestamp); + } + + /** Flush the data into the inner sink function and send the data into the sink. */ + private synchronized void writeRecord(RowData row, Long timestamp) { + System.out.println(row); + RowData key = keyExtractor.apply(row); + RowData value = valueCopier.apply(row); + reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp)); + batchCount++; + } + + private synchronized void flush() throws Exception { Review comment: call `checkFlushException` at the beginning of this method. ########## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunctionTest.java ########## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; + +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; +import static org.junit.Assert.assertEquals; + +/** Test for {@link BufferedUpsertKafkaSinkFunction}. */ +public class BufferedUpsertKafkaSinkFunctionTest { + + private static final ResolvedSchema SCHEMA = + ResolvedSchema.of( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("title", DataTypes.STRING().notNull()), + Column.physical("author", DataTypes.STRING()), + Column.physical("price", DataTypes.DOUBLE()), + Column.physical("qty", DataTypes.INT()), + Column.physical("ts", DataTypes.TIMESTAMP(3))); + + private static final int keyIndices = 0; + private static final int TIMESTAMP_INDICES = 5; + private static final int BATCH_SIZE = 4; + private static final Duration FLUSH_INTERVAL = Duration.ofMillis(60_000); + + public static final RowData[] TEST_DATA = { + GenericRowData.ofKind( + INSERT, + 1001, + StringData.fromString("Java public for dummies"), + StringData.fromString("Tan Ah Teck"), + 11.11, + 11, + TimestampData.fromInstant(Instant.parse("2021-03-30T15:00:00Z"))), + GenericRowData.ofKind( + INSERT, + 1002, + StringData.fromString("More Java for dummies"), + StringData.fromString("Tan Ah Teck"), + 22.22, + 22, + TimestampData.fromInstant(Instant.parse("2021-03-30T16:00:00Z"))), + GenericRowData.ofKind( + INSERT, + 1004, + StringData.fromString("A Cup of Java"), + StringData.fromString("Kumar"), + 44.44, + 44, + TimestampData.fromInstant(Instant.parse("2021-03-30T17:00:00Z"))), + GenericRowData.ofKind( + UPDATE_AFTER, + 1004, + StringData.fromString("A Teaspoon of Java"), + StringData.fromString("Kevin Jones"), + 55.55, + 55, + TimestampData.fromInstant(Instant.parse("2021-03-30T18:00:00Z"))), + GenericRowData.ofKind( + UPDATE_AFTER, + 1004, + StringData.fromString("A Teaspoon of Java 1.4"), + StringData.fromString("Kevin Jones"), + 66.66, + 66, + TimestampData.fromInstant(Instant.parse("2021-03-30T19:00:00Z"))), + GenericRowData.ofKind( + UPDATE_AFTER, + 1004, + StringData.fromString("A Teaspoon of Java 1.5"), + StringData.fromString("Kevin Jones"), + 77.77, + 77, + TimestampData.fromInstant(Instant.parse("2021-03-30T20:00:00Z"))), + GenericRowData.ofKind( + DELETE, + 1004, + StringData.fromString("A Teaspoon of Java 1.8"), + StringData.fromString("Kevin Jones"), + null, + 1010, + TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z"))) + }; + + private static KafkaDynamicSink.SinkFunctionProviderCreator creator = + BufferedUpsertKafkaSinkFunction.createBufferedSinkFunction( + SCHEMA.toSinkRowDataType(), new int[] {keyIndices}, BATCH_SIZE, FLUSH_INTERVAL); + + @Test + public void testWirteData() throws Exception { + MockedSinkFunction sinkFunction = new MockedSinkFunction(); + createBufferedSinkAndWriteData(sinkFunction); + + HashMap<Integer, List<RowData>> expected = new HashMap<>(); + expected.put( + 1001, + Collections.singletonList( + GenericRowData.ofKind( + UPDATE_AFTER, + 1001, + StringData.fromString("Java public for dummies"), + StringData.fromString("Tan Ah Teck"), + 11.11, + 11, + TimestampData.fromInstant(Instant.parse("2021-03-30T15:00:00Z"))))); + expected.put( + 1002, + Collections.singletonList( + GenericRowData.ofKind( + UPDATE_AFTER, + 1002, + StringData.fromString("More Java for dummies"), + StringData.fromString("Tan Ah Teck"), + 22.22, + 22, + TimestampData.fromInstant(Instant.parse("2021-03-30T16:00:00Z"))))); + expected.put( + 1004, + Collections.singletonList( + GenericRowData.ofKind( + UPDATE_AFTER, + 1004, + StringData.fromString("A Teaspoon of Java"), + StringData.fromString("Kevin Jones"), + 55.55, + 55, + TimestampData.fromInstant(Instant.parse("2021-03-30T18:00:00Z"))))); + + compareCompactedResult(expected, sinkFunction.rowDataCollectors); Review comment: Why not compare row list received by sinkFunction, but compare the map? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
