leaves12138 commented on code in PR #7809: URL: https://github.com/apache/paimon/pull/7809#discussion_r3216662522
########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/DataStatisticsOperator.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.utils.RuntimeContextUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.sink.RowPartitionKeyExtractor; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Operator that collects local partition data statistics and sends them to coordinator for global + * aggregation, then forwards global statistics to the downstream partitioner. + */ +public class DataStatisticsOperator extends AbstractStreamOperator<StatisticsOrRecord> + implements OneInputStreamOperator<InternalRow, StatisticsOrRecord>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final String operatorName; + private final TableSchema schema; + private final OperatorEventGateway operatorEventGateway; + + private transient int subtaskIndex; + private transient volatile DataStatistics localStatistics; + private transient RowPartitionKeyExtractor extractor; + private transient TypeSerializer<DataStatistics> statisticsSerializer; + + DataStatisticsOperator( + StreamOperatorParameters<StatisticsOrRecord> parameters, + String operatorName, + TableSchema schema, + OperatorEventGateway operatorEventGateway) { + super(); + this.operatorName = operatorName; + this.schema = schema; + this.operatorEventGateway = operatorEventGateway; + this.setup( + parameters.getContainingTask(), + parameters.getStreamConfig(), + parameters.getOutput()); + } + + @Override + public void open() throws Exception { + this.extractor = new RowPartitionKeyExtractor(schema); + this.statisticsSerializer = new DataStatisticsSerializer(); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + this.subtaskIndex = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()); + this.localStatistics = StatisticsUtil.createDataStatistics(); + } + + @Override + public void handleOperatorEvent(OperatorEvent event) { + checkArgument( + event instanceof StatisticsEvent, + String.format( + "Operator %s subtask %s received unexpected operator event %s", + operatorName, subtaskIndex, event.getClass())); + StatisticsEvent statisticsEvent = (StatisticsEvent) event; + LOG.debug( + "Operator {} subtask {} received global data event from coordinator checkpoint {}", + operatorName, + subtaskIndex, + statisticsEvent.getCheckpointId()); + DataStatistics globalStatistics = + StatisticsUtil.deserializeDataStatistics( + statisticsEvent.getStatisticsBytes(), statisticsSerializer); + if (globalStatistics != null) { + output.collect(new StreamRecord<>(StatisticsOrRecord.fromStatistics(globalStatistics))); + } + } + + @Override + public void processElement(StreamRecord<InternalRow> streamRecord) throws Exception { + InternalRow row = streamRecord.getValue(); + BinaryRow partition = extractor.partition(row); + String partitionKey = partition.toString(); Review Comment: `BinaryRow.toString()` is not a lossless partition key. `BinaryRow`/`BinarySection` does not override `toString()`, so this falls back to `Object.toString()`, which only embeds the content-based `hashCode()`. Different partition values can collide and then be merged into the same statistics entry, so the dynamic assignment can be built from incorrect weights. Could we use a canonical key here instead, e.g. copied/serialized `BinaryRow` bytes (with an appropriate serializer) or a partition-path/value string, and use the same key on the lookup side? ########## paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java: ########## @@ -562,7 +567,179 @@ public void testPartitionStrategyForPartitionedTable(CoreOptions.PartitionSinkSt strategy == CoreOptions.PartitionSinkStrategy.HASH ? hashStrategyResultFileCount : lessSinkParallelism; - partitionEntriesLess.forEach(x -> assertThat(x.fileCount()).isEqualTo(fileCountLess)); + if (strategy == CoreOptions.PartitionSinkStrategy.PARTITION_DYNAMIC) { + fileCountLess = Math.min(lessSinkParallelism, 4); + } + final int expectedFileCountLess = fileCountLess; + partitionEntriesLess.forEach( + x -> assertThat(x.fileCount()).isEqualTo(expectedFileCountLess)); + } + + @Test + public void testPartitionDynamicDataCorrectness() { + batchSql( + "CREATE TABLE IF NOT EXISTS dynamic_correctness (" + + "id INT, data STRING, dt STRING) PARTITIONED BY (dt)" + + " WITH (" + + "'bucket' = '-1'," + + "'partition.sink-strategy' = 'partition_dynamic'," + + "'sink.parallelism' = '4')"); + + batchSql( + "INSERT INTO dynamic_correctness VALUES " + + "(1, 'a', '20250301'), (2, 'b', '20250301'), " + + "(3, 'c', '20250302'), (4, 'd', '20250302'), " + + "(5, 'e', '20250303')"); + + List<Row> result = batchSql("SELECT * FROM dynamic_correctness ORDER BY id"); + assertThat(result).hasSize(5); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, "a", "20250301"), + Row.of(2, "b", "20250301"), + Row.of(3, "c", "20250302"), + Row.of(4, "d", "20250302"), + Row.of(5, "e", "20250303")); + } + + @Test + public void testPartitionDynamicWithSkewedData() { + batchSql( + "CREATE TABLE IF NOT EXISTS dynamic_skewed (" + + "id INT, data STRING, dt STRING) PARTITIONED BY (dt)" + + " WITH (" + + "'bucket' = '-1'," + + "'partition.sink-strategy' = 'partition_dynamic'," + + "'sink.parallelism' = '4')"); + + // Heavily skewed: partition '20250301' gets most data + StringBuilder values = new StringBuilder(); + for (int i = 1; i <= 100; i++) { + values.append(String.format("(%d, 'data%d', '20250301'),", i, i)); + } + for (int i = 101; i <= 110; i++) { + values.append(String.format("(%d, 'data%d', '20250302'),", i, i)); + } + for (int i = 111; i <= 115; i++) { + values.append(String.format("(%d, 'data%d', '20250303'),", i, i)); + } + Review Comment: Most of the new IT coverage runs through batch `INSERT`, so it mainly verifies data correctness and the no-statistics fallback (`min(parallelism, 4)`). Since the new dynamic behavior is driven by checkpoint statistics and coordinator events, could we add an end-to-end streaming/checkpointed test that writes a skewed partition before/after at least one checkpoint and verifies that the hot partition is actually spread according to the updated statistics? That would cover the coordinator -> operator event -> partitioner update path, not only the direct unit-test path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
