This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f971650fc [core] introduce weighted strategy for external-path (#7356)
9f971650fc is described below

commit 9f971650fcd568944089cb66dc9ded0d26a24b65
Author: LsomeYeah <[email protected]>
AuthorDate: Sat Mar 7 10:32:05 2026 +0800

    [core] introduce weighted strategy for external-path (#7356)
---
 .../shortcodes/generated/core_configuration.html   |  20 ++-
 .../main/java/org/apache/paimon/CoreOptions.java   |  38 ++++-
 .../org/apache/paimon/fs/ExternalPathProvider.java |  12 +-
 .../paimon/fs/WeightedExternalPathProvider.java    |  71 +++++++++
 .../fs/WeightedExternalPathProviderTest.java       | 168 +++++++++++++++++++++
 .../java/org/apache/paimon/AbstractFileStore.java  |   1 +
 .../paimon/table/format/FormatTableFileWriter.java |   1 +
 .../apache/paimon/utils/FileStorePathFactory.java  |   8 +-
 .../apache/paimon/io/DataFileIndexWriterTest.java  |   1 +
 .../paimon/io/KeyValueFileReadWriteTest.java       |   1 +
 .../paimon/manifest/ManifestFileMetaTestBase.java  |   1 +
 .../apache/paimon/manifest/ManifestFileTest.java   |   1 +
 .../apache/paimon/manifest/ManifestListTest.java   |   1 +
 .../paimon/utils/FileStorePathFactoryTest.java     |   2 +
 .../apache/paimon/flink/AppendOnlyTableITCase.java |  46 ++++++
 .../flink/PrimaryKeyFileStoreTableITCase.java      |  58 +++++++
 .../flink/source/TestChangelogDataReadWrite.java   |   1 +
 .../apache/paimon/spark/SparkFileIndexITCase.java  |   1 +
 18 files changed, 422 insertions(+), 10 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index b592fea8bb..5003d23dac 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -57,22 +57,22 @@ under the License.
             <td>Write blob field using blob descriptor rather than blob 
bytes.</td>
         </tr>
         <tr>
-            <td><h5>blob-external-storage-field</h5></td>
+            <td><h5>blob-descriptor-field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Comma-separated BLOB field names (must be a subset of 
'blob-descriptor-field') whose raw data will be written to external storage at 
write time. The external storage path is configured via 
'blob-external-storage-path'. Orphan file cleanup is not applied to that 
path.</td>
+            <td>Comma-separated BLOB field names to store as serialized 
BlobDescriptor bytes inline in data files.</td>
         </tr>
         <tr>
-            <td><h5>blob-external-storage-path</h5></td>
+            <td><h5>blob-external-storage-field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>The external storage path where raw BLOB data from fields 
configured by 'blob-external-storage-field' is written at write time. Orphan 
file cleanup is not applied to this path.</td>
+            <td>Comma-separated BLOB field names (must be a subset of 
'blob-descriptor-field') whose raw data will be written to external storage at 
write time. The external storage path is configured via 
'blob-external-storage-path'. Orphan file cleanup is not applied to that 
path.</td>
         </tr>
         <tr>
-            <td><h5>blob-descriptor-field</h5></td>
+            <td><h5>blob-external-storage-path</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Comma-separated BLOB field names to store as serialized 
BlobDescriptor bytes inline in data files.</td>
+            <td>The external storage path where raw BLOB data from fields 
configured by 'blob-external-storage-field' is written at write time. Orphan 
file cleanup is not applied to this path.</td>
         </tr>
         <tr>
             <td><h5>blob-field</h5></td>
@@ -426,7 +426,13 @@ under the License.
             <td><h5>data-file.external-paths.strategy</h5></td>
             <td style="word-wrap: break-word;">none</td>
             <td><p>Enum</p></td>
-            <td>The strategy of selecting an external path when writing 
data.<br /><br />Possible values:<ul><li>"none": Do not choose any external 
storage, data will still be written to the default warehouse 
path.</li><li>"specific-fs": Select a specific file system as the external 
path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a 
new file, a path is chosen from data-file.external-paths in 
turn.</li><li>"entropy-inject": When writing a new file, a path is c [...]
+            <td>The strategy of selecting an external path when writing 
data.<br /><br />Possible values:<ul><li>"none": Do not choose any external 
storage, data will still be written to the default warehouse 
path.</li><li>"specific-fs": Select a specific file system as the external 
path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a 
new file, a path is chosen from data-file.external-paths in 
turn.</li><li>"entropy-inject": When writing a new file, a path is c [...]
+        </tr>
+        <tr>
+            <td><h5>data-file.external-paths.weights</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The weights for external paths when 
data-file.external-paths.strategy is set to weight-robin. Format: 
'weight1,weight2,...' with weights corresponding to paths in 
data-file.external-paths by order. Example: '10,5,15' means first path has 
weight 10, second 5, third 15. Weights must be positive integers.</td>
         </tr>
         <tr>
             <td><h5>data-file.path-directory</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index aa27621b21..70e50efe9c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -209,6 +209,23 @@ public class CoreOptions implements Serializable {
                                     + ExternalPathStrategy.SPECIFIC_FS
                                     + ", should be the prefix scheme of the 
external path, now supported are s3 and oss.");
 
+    public static final ConfigOption<String> DATA_FILE_EXTERNAL_PATHS_WEIGHTS =
+            key("data-file.external-paths.weights")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The weights for external paths when "
+                                    + DATA_FILE_EXTERNAL_PATHS_STRATEGY.key()
+                                    + " is set to "
+                                    + ExternalPathStrategy.WEIGHTED
+                                    + ". "
+                                    + "Format: 'weight1,weight2,...' "
+                                    + "with weights corresponding to paths in "
+                                    + DATA_FILE_EXTERNAL_PATHS.key()
+                                    + " by order. "
+                                    + "Example: '10,5,15' means first path has 
weight 10, second 5, third 15. "
+                                    + "Weights must be positive integers.");
+
     public static final ConfigOption<Boolean> 
COMPACTION_FORCE_REWRITE_ALL_FILES =
             key("compaction.force-rewrite-all-files")
                     .booleanType()
@@ -3178,6 +3195,21 @@ public class CoreOptions implements Serializable {
         return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
     }
 
+    @Nullable
+    public int[] externalPathWeights() {
+        String weightsStr = options.get(DATA_FILE_EXTERNAL_PATHS_WEIGHTS);
+        if (weightsStr == null) {
+            return null;
+        }
+        String[] parts = weightsStr.split(",");
+        int[] weights = new int[parts.length];
+        for (int i = 0; i < parts.length; i++) {
+            weights[i] = Integer.parseInt(parts[i].trim());
+            checkArgument(weights[i] > 0, "Weight must be positive, got: %s", 
weights[i]);
+        }
+        return weights;
+    }
+
     public Boolean forceRewriteAllFiles() {
         return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
     }
@@ -4088,7 +4120,11 @@ public class CoreOptions implements Serializable {
 
         ENTROPY_INJECT(
                 "entropy-inject",
-                "When writing a new file, a path is chosen based on the hash 
value of the file's content.");
+                "When writing a new file, a path is chosen based on the hash 
value of the file's content."),
+
+        WEIGHTED(
+                "weight-robin",
+                "When writing a new file, a path is chosen based on configured 
weights.");
 
         private final String value;
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
index 65fb5deb74..8ad617f8a7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
@@ -32,7 +32,10 @@ public interface ExternalPathProvider extends Serializable {
 
     @Nullable
     static ExternalPathProvider create(
-            ExternalPathStrategy strategy, List<Path> externalTablePaths, Path 
relativeBucketPath) {
+            ExternalPathStrategy strategy,
+            List<Path> externalTablePaths,
+            Path relativeBucketPath,
+            @Nullable int[] weights) {
         switch (strategy) {
             case ENTROPY_INJECT:
                 return new EntropyInjectExternalPathProvider(
@@ -41,6 +44,13 @@ public interface ExternalPathProvider extends Serializable {
                 // specific fs can use round-robin with only one path
             case ROUND_ROBIN:
                 return new RoundRobinExternalPathProvider(externalTablePaths, 
relativeBucketPath);
+            case WEIGHTED:
+                if (externalTablePaths.size() < 2 || weights == null) {
+                    return new RoundRobinExternalPathProvider(
+                            externalTablePaths, relativeBucketPath);
+                }
+                return new WeightedExternalPathProvider(
+                        externalTablePaths, relativeBucketPath, weights);
             case NONE:
                 return null;
             default:
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/WeightedExternalPathProvider.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/WeightedExternalPathProvider.java
new file mode 100644
index 0000000000..e486000845
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/WeightedExternalPathProvider.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Provider for weighted external data paths.
+ *
+ * <p>This provider uses a weighted random algorithm to select paths based on 
configured weights.
+ * Higher weights result in higher probability of selection.
+ */
+public class WeightedExternalPathProvider implements ExternalPathProvider {
+
+    private final NavigableMap<Double, Path> cumulativeWeightMap;
+    private final double totalWeight;
+    private final Path relativeBucketPath;
+
+    public WeightedExternalPathProvider(
+            List<Path> externalTablePaths, Path relativeBucketPath, int[] 
weights) {
+        checkArgument(
+                externalTablePaths.size() == weights.length,
+                "The number of external paths and weights should be the same. 
Paths: "
+                        + externalTablePaths.size()
+                        + ", Weights: "
+                        + weights.length);
+        this.relativeBucketPath = relativeBucketPath;
+        this.cumulativeWeightMap = 
buildCumulativeWeightMap(externalTablePaths, weights);
+        this.totalWeight = Arrays.stream(weights).sum();
+    }
+
+    @Override
+    public Path getNextExternalDataPath(String fileName) {
+        double randomValue = ThreadLocalRandom.current().nextDouble() * 
totalWeight;
+        Path selectedPath = 
cumulativeWeightMap.higherEntry(randomValue).getValue();
+        return new Path(new Path(selectedPath, relativeBucketPath), fileName);
+    }
+
+    private NavigableMap<Double, Path> buildCumulativeWeightMap(
+            List<Path> externalTablePaths, int[] weights) {
+        NavigableMap<Double, Path> map = new TreeMap<>();
+        double cumulativeWeight = 0;
+        for (int i = 0; i < externalTablePaths.size(); i++) {
+            cumulativeWeight += weights[i];
+            map.put(cumulativeWeight, externalTablePaths.get(i));
+        }
+        return map;
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/WeightedExternalPathProviderTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/WeightedExternalPathProviderTest.java
new file mode 100644
index 0000000000..9d9ffb751b
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/WeightedExternalPathProviderTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.CoreOptions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WeightedExternalPathProvider}. */
+public class WeightedExternalPathProviderTest {
+
+    @Test
+    public void testEqualWeights() {
+        int fileNum = 3000;
+        int[] weights = {10, 10, 10};
+        Map<String, Integer> pathCounts = generatePaths(fileNum, weights);
+
+        int expectedCount = fileNum / 3;
+        for (int count : pathCounts.values()) {
+            assertThat(count).isBetween(expectedCount - 100, expectedCount + 
100);
+        }
+    }
+
+    @Test
+    public void testDifferentWeights() {
+        int[] weights = {10, 5, 15};
+        int fileNum = 3000;
+        Map<String, Integer> pathCounts = generatePaths(fileNum, weights);
+
+        int totalWeight = 30;
+        assertThat(pathCounts.get("s3://bucket1/data"))
+                .isBetween(
+                        (int) (fileNum * 10.0 / totalWeight) - 100,
+                        (int) (fileNum * 10.0 / totalWeight) + 100);
+        assertThat(pathCounts.get("oss://bucket2/data"))
+                .isBetween(
+                        (int) (fileNum * 5.0 / totalWeight) - 100,
+                        (int) (fileNum * 5.0 / totalWeight) + 100);
+        assertThat(pathCounts.get("hdfs://namenode/data"))
+                .isBetween(
+                        (int) (fileNum * 15.0 / totalWeight) - 100,
+                        (int) (fileNum * 15.0 / totalWeight) + 100);
+    }
+
+    @Test
+    public void testSinglePath() {
+        List<Path> paths = new ArrayList<>();
+        paths.add(new Path("s3://bucket1/data"));
+
+        int[] weights = {10};
+
+        Path relativeBucketPath = new Path("bucket-0");
+        WeightedExternalPathProvider provider =
+                new WeightedExternalPathProvider(paths, relativeBucketPath, 
weights);
+
+        for (int fileNum = 0; fileNum < 1000; fileNum++) {
+            Path selectedPath = provider.getNextExternalDataPath("file-" + 
fileNum + ".parquet");
+            assertThat(selectedPath.toString())
+                    .contains("s3://bucket1/data/bucket-0/file-" + fileNum + 
".parquet");
+        }
+    }
+
+    @Test
+    public void testMissingWeight() {
+        List<Path> paths = new ArrayList<>();
+        paths.add(new Path("s3://bucket1/data"));
+        paths.add(new Path("oss://bucket2/data"));
+
+        int[] weights = {10};
+        // Missing weight for oss://bucket2/data
+
+        Path relativeBucketPath = new Path("bucket-0");
+
+        assertThatThrownBy(
+                        () -> new WeightedExternalPathProvider(paths, 
relativeBucketPath, weights))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "The number of external paths and weights should be 
the same. Paths: 2, Weights: 1");
+    }
+
+    @Test
+    public void testPathConstruction() {
+        List<Path> paths = new ArrayList<>();
+        paths.add(new Path("s3://bucket1/data"));
+
+        int[] weights = {10};
+
+        Path relativeBucketPath = new Path("bucket-0");
+        WeightedExternalPathProvider provider =
+                new WeightedExternalPathProvider(paths, relativeBucketPath, 
weights);
+
+        Path selectedPath = 
provider.getNextExternalDataPath("test-file.parquet");
+        assertThat(selectedPath.toString())
+                .isEqualTo("s3://bucket1/data/bucket-0/test-file.parquet");
+    }
+
+    @Test
+    public void testCreateExternalPathProvider() {
+        ExternalPathProvider provider1 =
+                ExternalPathProvider.create(
+                        CoreOptions.ExternalPathStrategy.WEIGHTED,
+                        Arrays.asList(new Path("oss://path1"), new 
Path("oss://path2")),
+                        new Path("bucket-0"),
+                        null);
+        
assertThat(provider1).isInstanceOf(RoundRobinExternalPathProvider.class);
+
+        ExternalPathProvider provider2 =
+                ExternalPathProvider.create(
+                        CoreOptions.ExternalPathStrategy.WEIGHTED,
+                        Collections.singletonList(new Path("oss://path1")),
+                        new Path("bucket-0"),
+                        new int[] {10});
+        
assertThat(provider2).isInstanceOf(RoundRobinExternalPathProvider.class);
+
+        ExternalPathProvider provider3 =
+                ExternalPathProvider.create(
+                        CoreOptions.ExternalPathStrategy.WEIGHTED,
+                        Arrays.asList(new Path("oss://path1"), new 
Path("oss://path2")),
+                        new Path("bucket-0"),
+                        new int[] {10, 20});
+        assertThat(provider3).isInstanceOf(WeightedExternalPathProvider.class);
+    }
+
+    private Map<String, Integer> generatePaths(int fileNum, int[] weights) {
+        List<Path> paths = new ArrayList<>();
+        paths.add(new Path("s3://bucket1/data"));
+        paths.add(new Path("oss://bucket2/data"));
+        paths.add(new Path("hdfs://namenode/data"));
+
+        Path relativeBucketPath = new Path("bucket-0");
+        WeightedExternalPathProvider provider =
+                new WeightedExternalPathProvider(paths, relativeBucketPath, 
weights);
+
+        Map<String, Integer> pathCounts = new HashMap<>();
+        for (int i = 0; i < fileNum; i++) {
+            Path selectedPath = provider.getNextExternalDataPath("file-" + i + 
".parquet");
+            String basePath = selectedPath.getParent().getParent().toString();
+            pathCounts.put(basePath, pathCounts.getOrDefault(basePath, 0) + 1);
+        }
+        return pathCounts;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 1abef11a2c..85e34319ed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -143,6 +143,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.dataFilePathDirectory(),
                 createExternalPaths(),
                 options.externalPathStrategy(),
+                options.externalPathWeights(),
                 options.indexFileInDataFileDir(),
                 options.globalIndexExternalPath());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
index c241eb7d18..23061e0d14 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
@@ -69,6 +69,7 @@ public class FormatTableFileWriter {
                         options.dataFilePathDirectory(),
                         null,
                         CoreOptions.ExternalPathStrategy.NONE,
+                        null,
                         options.indexFileInDataFileDir(),
                         null);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 6ebe6438ed..e2c99b52c5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -83,6 +83,7 @@ public class FileStorePathFactory {
     private final AtomicInteger statsFileCount;
     private final List<Path> externalPaths;
     private final ExternalPathStrategy strategy;
+    @Nullable private final int[] externalPathWeights;
     @Nullable private final Path globalIndexExternalRootDir;
 
     public FileStorePathFactory(
@@ -98,6 +99,7 @@ public class FileStorePathFactory {
             @Nullable String dataFilePathDirectory,
             List<Path> externalPaths,
             ExternalPathStrategy strategy,
+            @Nullable int[] externalPathWeights,
             boolean indexFileInDataFileDir,
             @Nullable Path globalIndexExternalRootDir) {
         this.root = root;
@@ -120,6 +122,7 @@ public class FileStorePathFactory {
         this.statsFileCount = new AtomicInteger(0);
         this.externalPaths = externalPaths;
         this.strategy = strategy;
+        this.externalPathWeights = externalPathWeights;
         this.globalIndexExternalRootDir = globalIndexExternalRootDir;
     }
 
@@ -215,7 +218,10 @@ public class FileStorePathFactory {
             return null;
         }
         return ExternalPathProvider.create(
-                strategy, externalPaths, relativeBucketPath(partition, 
bucket));
+                strategy,
+                externalPaths,
+                relativeBucketPath(partition, bucket),
+                externalPathWeights);
     }
 
     public List<Path> getExternalPaths() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
index 1739a27154..e02a04b063 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
@@ -173,6 +173,7 @@ public class DataFileIndexWriterTest {
                         null,
                         null,
                         CoreOptions.ExternalPathStrategy.NONE,
+                        null,
                         false,
                         null);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index e350cb2a9a..251e0b17e7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -327,6 +327,7 @@ public class KeyValueFileReadWriteTest {
                                 null,
                                 null,
                                 CoreOptions.ExternalPathStrategy.NONE,
+                                null,
                                 false,
                                 null);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 4aa696a46a..a75412c6fb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -156,6 +156,7 @@ public abstract class ManifestFileMetaTestBase {
                                 null,
                                 null,
                                 CoreOptions.ExternalPathStrategy.NONE,
+                                null,
                                 false,
                                 null),
                         Long.MAX_VALUE,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index dbf11aa8ff..a10d325d84 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -144,6 +144,7 @@ public class ManifestFileTest {
                         null,
                         null,
                         CoreOptions.ExternalPathStrategy.NONE,
+                        null,
                         false,
                         null);
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 
1024;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index a4b3ceec5a..2b945f87b4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -176,6 +176,7 @@ public class ManifestListTest {
                 null,
                 null,
                 CoreOptions.ExternalPathStrategy.NONE,
+                null,
                 false,
                 null);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
index 146a62cae6..4f067163fc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
@@ -95,6 +95,7 @@ public class FileStorePathFactoryTest {
                         null,
                         null,
                         CoreOptions.ExternalPathStrategy.NONE,
+                        null,
                         false,
                         null);
 
@@ -141,6 +142,7 @@ public class FileStorePathFactoryTest {
                 null,
                 null,
                 CoreOptions.ExternalPathStrategy.NONE,
+                null,
                 false,
                 null);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 6dc41981bf..13e0825f69 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -34,7 +34,10 @@ import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -225,6 +228,49 @@ public class AppendOnlyTableITCase extends 
CatalogITCaseBase {
                         Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC"), 
Row.of(4, "DDD"));
     }
 
+    @Test
+    public void testReadWriteWithExternalPathWeightRobinStrategy() throws 
IOException {
+        String externalPaths =
+                TraceableFileIO.SCHEME
+                        + "://"
+                        + tempExternalPath1.toString()
+                        + ","
+                        + LocalFileIOLoader.SCHEME
+                        + "://"
+                        + tempExternalPath2.toString();
+        batchSql(
+                "ALTER TABLE append_table SET ("
+                        + "'data-file.external-paths' = '"
+                        + externalPaths
+                        + "', "
+                        + "'data-file.external-paths.strategy' = 
'weight-robin', "
+                        + "'data-file.external-paths.weights' = '1,3', "
+                        + "'write-only' = 'true'"
+                        + ")");
+
+        int fileNum = 50;
+        for (int i = 1; i <= fileNum; i++) {
+            batchSql("INSERT INTO append_table VALUES (" + i + ", 'AAA')");
+        }
+
+        List<Row> rows = batchSql("SELECT * FROM append_table");
+        assertThat(rows.size()).isEqualTo(fileNum);
+
+        // Verify file distribution based on weights
+        long filesInPath1 =
+                Files.list(Paths.get(tempExternalPath1.toString() + 
"/bucket-0")).count();
+        long filesInPath2 =
+                Files.list(Paths.get(tempExternalPath2.toString() + 
"/bucket-0")).count();
+        long totalFiles = filesInPath1 + filesInPath2;
+
+        // Since the file sample size is small in IT case, we only verify that 
higher-weighted path
+        // has more files
+        assertThat(filesInPath1).isGreaterThan(0);
+        assertThat(filesInPath2).isGreaterThan(0);
+        assertThat(filesInPath2).isGreaterThan(filesInPath1);
+        assertThat(totalFiles).isEqualTo(fileNum);
+    }
+
     @Test
     public void testReadWriteWithExternalPathNoneStrategy() {
         String externalPaths = TraceableFileIO.SCHEME + "://" + 
tempExternalPath1.toString();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index d98f277a55..5883c51716 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -306,6 +307,63 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         assertThat(actual).containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]", 
"+I[3, C]");
     }
 
+    @Test
+    public void testTableReadWriteWithExternalPathWeightRobin() throws 
Exception {
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        
.checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
+                        .parallelism(1)
+                        .build();
+
+        sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        sEnv.executeSql("USE CATALOG testCatalog");
+        String externalPaths =
+                TraceableFileIO.SCHEME
+                        + "://"
+                        + externalPath1.toString()
+                        + ","
+                        + LocalFileIOLoader.SCHEME
+                        + "://"
+                        + externalPath2.toString();
+        sEnv.executeSql(
+                "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT 
ENFORCED ) "
+                        + "WITH ( "
+                        + "'bucket' = '1',"
+                        + "'write-only' = 'true',"
+                        + "'data-file.external-paths' = '"
+                        + externalPaths
+                        + "',"
+                        + "'data-file.external-paths.strategy' = 
'weight-robin',"
+                        + "'data-file.external-paths.weights' = '10,5'"
+                        + ")");
+
+        CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM 
T2"));
+
+        int fileNum = 50;
+        for (int i = 1; i <= fileNum; i++) {
+            sEnv.executeSql("INSERT INTO T2 VALUES (" + i + ", 'data" + i + 
"')").await();
+        }
+
+        List<String> actual = new ArrayList<>();
+        for (int i = 0; i < fileNum; i++) {
+            actual.add(it.next().toString());
+        }
+        // Verify all data is readable
+        assertThat(actual).hasSize(fileNum);
+
+        long filesInPath1 = Files.list(Paths.get(externalPath1.toString() + 
"/bucket-0")).count();
+        long filesInPath2 = Files.list(Paths.get(externalPath2.toString() + 
"/bucket-0")).count();
+        long totalFiles = filesInPath1 + filesInPath2;
+
+        // Since the file sample size is small in IT case, we only verify that 
higher-weighted path
+        // has more files
+        assertThat(filesInPath1).isGreaterThan(0);
+        assertThat(filesInPath2).isGreaterThan(0);
+        assertThat(filesInPath1).isGreaterThan(filesInPath2);
+        assertThat(totalFiles).isEqualTo(fileNum);
+    }
+
     @Test
     public void testDropTableWithExternalPaths() throws Exception {
         TableEnvironment sEnv =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 23e837d31d..8114ac17eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -113,6 +113,7 @@ public class TestChangelogDataReadWrite {
                         null,
                         null,
                         CoreOptions.ExternalPathStrategy.NONE,
+                        null,
                         false,
                         null);
         this.snapshotManager = newSnapshotManager(LocalFileIO.create(), new 
Path(root));
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 20d78a76c3..ed3c53683b 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -211,6 +211,7 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
                         null,
                         null,
                         CoreOptions.ExternalPathStrategy.NONE,
+                        null,
                         false,
                         null);
 


Reply via email to