leaves12138 commented on code in PR #7809:
URL: https://github.com/apache/paimon/pull/7809#discussion_r3216662522


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/DataStatisticsOperator.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.partition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Operator that collects local partition data statistics and sends them to 
coordinator for global
+ * aggregation, then forwards global statistics to the downstream partitioner.
+ */
+public class DataStatisticsOperator extends 
AbstractStreamOperator<StatisticsOrRecord>
+        implements OneInputStreamOperator<InternalRow, StatisticsOrRecord>, 
OperatorEventHandler {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String operatorName;
+    private final TableSchema schema;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient int subtaskIndex;
+    private transient volatile DataStatistics localStatistics;
+    private transient RowPartitionKeyExtractor extractor;
+    private transient TypeSerializer<DataStatistics> statisticsSerializer;
+
+    DataStatisticsOperator(
+            StreamOperatorParameters<StatisticsOrRecord> parameters,
+            String operatorName,
+            TableSchema schema,
+            OperatorEventGateway operatorEventGateway) {
+        super();
+        this.operatorName = operatorName;
+        this.schema = schema;
+        this.operatorEventGateway = operatorEventGateway;
+        this.setup(
+                parameters.getContainingTask(),
+                parameters.getStreamConfig(),
+                parameters.getOutput());
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.extractor = new RowPartitionKeyExtractor(schema);
+        this.statisticsSerializer = new DataStatisticsSerializer();
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        this.subtaskIndex = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+        this.localStatistics = StatisticsUtil.createDataStatistics();
+    }
+
+    @Override
+    public void handleOperatorEvent(OperatorEvent event) {
+        checkArgument(
+                event instanceof StatisticsEvent,
+                String.format(
+                        "Operator %s subtask %s received unexpected operator 
event %s",
+                        operatorName, subtaskIndex, event.getClass()));
+        StatisticsEvent statisticsEvent = (StatisticsEvent) event;
+        LOG.debug(
+                "Operator {} subtask {} received global data event from 
coordinator checkpoint {}",
+                operatorName,
+                subtaskIndex,
+                statisticsEvent.getCheckpointId());
+        DataStatistics globalStatistics =
+                StatisticsUtil.deserializeDataStatistics(
+                        statisticsEvent.getStatisticsBytes(), 
statisticsSerializer);
+        if (globalStatistics != null) {
+            output.collect(new 
StreamRecord<>(StatisticsOrRecord.fromStatistics(globalStatistics)));
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<InternalRow> streamRecord) throws 
Exception {
+        InternalRow row = streamRecord.getValue();
+        BinaryRow partition = extractor.partition(row);
+        String partitionKey = partition.toString();

Review Comment:
   `BinaryRow.toString()` is not a lossless partition key. 
`BinaryRow`/`BinarySection` does not override `toString()`, so this falls back 
to `Object.toString()`, which only embeds the content-based `hashCode()`. 
Different partition values can collide and then be merged into the same 
statistics entry, so the dynamic assignment can be built from incorrect 
weights. Could we use a canonical key here instead, e.g. copied/serialized 
`BinaryRow` bytes (with an appropriate serializer) or a partition-path/value 
string, and use the same key on the lookup side?



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java:
##########
@@ -562,7 +567,179 @@ public void 
testPartitionStrategyForPartitionedTable(CoreOptions.PartitionSinkSt
                 strategy == CoreOptions.PartitionSinkStrategy.HASH
                         ? hashStrategyResultFileCount
                         : lessSinkParallelism;
-        partitionEntriesLess.forEach(x -> 
assertThat(x.fileCount()).isEqualTo(fileCountLess));
+        if (strategy == CoreOptions.PartitionSinkStrategy.PARTITION_DYNAMIC) {
+            fileCountLess = Math.min(lessSinkParallelism, 4);
+        }
+        final int expectedFileCountLess = fileCountLess;
+        partitionEntriesLess.forEach(
+                x -> 
assertThat(x.fileCount()).isEqualTo(expectedFileCountLess));
+    }
+
+    @Test
+    public void testPartitionDynamicDataCorrectness() {
+        batchSql(
+                "CREATE TABLE IF NOT EXISTS dynamic_correctness ("
+                        + "id INT, data STRING, dt STRING) PARTITIONED BY (dt)"
+                        + " WITH ("
+                        + "'bucket' = '-1',"
+                        + "'partition.sink-strategy' = 'partition_dynamic',"
+                        + "'sink.parallelism' = '4')");
+
+        batchSql(
+                "INSERT INTO dynamic_correctness VALUES "
+                        + "(1, 'a', '20250301'), (2, 'b', '20250301'), "
+                        + "(3, 'c', '20250302'), (4, 'd', '20250302'), "
+                        + "(5, 'e', '20250303')");
+
+        List<Row> result = batchSql("SELECT * FROM dynamic_correctness ORDER 
BY id");
+        assertThat(result).hasSize(5);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, "a", "20250301"),
+                        Row.of(2, "b", "20250301"),
+                        Row.of(3, "c", "20250302"),
+                        Row.of(4, "d", "20250302"),
+                        Row.of(5, "e", "20250303"));
+    }
+
+    @Test
+    public void testPartitionDynamicWithSkewedData() {
+        batchSql(
+                "CREATE TABLE IF NOT EXISTS dynamic_skewed ("
+                        + "id INT, data STRING, dt STRING) PARTITIONED BY (dt)"
+                        + " WITH ("
+                        + "'bucket' = '-1',"
+                        + "'partition.sink-strategy' = 'partition_dynamic',"
+                        + "'sink.parallelism' = '4')");
+
+        // Heavily skewed: partition '20250301' gets most data
+        StringBuilder values = new StringBuilder();
+        for (int i = 1; i <= 100; i++) {
+            values.append(String.format("(%d, 'data%d', '20250301'),", i, i));
+        }
+        for (int i = 101; i <= 110; i++) {
+            values.append(String.format("(%d, 'data%d', '20250302'),", i, i));
+        }
+        for (int i = 111; i <= 115; i++) {
+            values.append(String.format("(%d, 'data%d', '20250303'),", i, i));
+        }
+

Review Comment:
   Most of the new IT coverage runs through batch `INSERT`, so it mainly 
verifies data correctness and the no-statistics fallback (`min(parallelism, 
4)`). Since the new dynamic behavior is driven by checkpoint statistics and 
coordinator events, could we add an end-to-end streaming/checkpointed test that 
writes a skewed partition before/after at least one checkpoint and verifies 
that the hot partition is actually spread according to the updated statistics? 
That would cover the coordinator -> operator event -> partitioner update path, 
not only the direct unit-test path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to