This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9f971650fc [core] introduce weighted strategy for external-path (#7356)
9f971650fc is described below
commit 9f971650fcd568944089cb66dc9ded0d26a24b65
Author: LsomeYeah <[email protected]>
AuthorDate: Sat Mar 7 10:32:05 2026 +0800
[core] introduce weighted strategy for external-path (#7356)
---
.../shortcodes/generated/core_configuration.html | 20 ++-
.../main/java/org/apache/paimon/CoreOptions.java | 38 ++++-
.../org/apache/paimon/fs/ExternalPathProvider.java | 12 +-
.../paimon/fs/WeightedExternalPathProvider.java | 71 +++++++++
.../fs/WeightedExternalPathProviderTest.java | 168 +++++++++++++++++++++
.../java/org/apache/paimon/AbstractFileStore.java | 1 +
.../paimon/table/format/FormatTableFileWriter.java | 1 +
.../apache/paimon/utils/FileStorePathFactory.java | 8 +-
.../apache/paimon/io/DataFileIndexWriterTest.java | 1 +
.../paimon/io/KeyValueFileReadWriteTest.java | 1 +
.../paimon/manifest/ManifestFileMetaTestBase.java | 1 +
.../apache/paimon/manifest/ManifestFileTest.java | 1 +
.../apache/paimon/manifest/ManifestListTest.java | 1 +
.../paimon/utils/FileStorePathFactoryTest.java | 2 +
.../apache/paimon/flink/AppendOnlyTableITCase.java | 46 ++++++
.../flink/PrimaryKeyFileStoreTableITCase.java | 58 +++++++
.../flink/source/TestChangelogDataReadWrite.java | 1 +
.../apache/paimon/spark/SparkFileIndexITCase.java | 1 +
18 files changed, 422 insertions(+), 10 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index b592fea8bb..5003d23dac 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -57,22 +57,22 @@ under the License.
<td>Write blob field using blob descriptor rather than blob
bytes.</td>
</tr>
<tr>
- <td><h5>blob-external-storage-field</h5></td>
+ <td><h5>blob-descriptor-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Comma-separated BLOB field names (must be a subset of
'blob-descriptor-field') whose raw data will be written to external storage at
write time. The external storage path is configured via
'blob-external-storage-path'. Orphan file cleanup is not applied to that
path.</td>
+ <td>Comma-separated BLOB field names to store as serialized
BlobDescriptor bytes inline in data files.</td>
</tr>
<tr>
- <td><h5>blob-external-storage-path</h5></td>
+ <td><h5>blob-external-storage-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>The external storage path where raw BLOB data from fields
configured by 'blob-external-storage-field' is written at write time. Orphan
file cleanup is not applied to this path.</td>
+ <td>Comma-separated BLOB field names (must be a subset of
'blob-descriptor-field') whose raw data will be written to external storage at
write time. The external storage path is configured via
'blob-external-storage-path'. Orphan file cleanup is not applied to that
path.</td>
</tr>
<tr>
- <td><h5>blob-descriptor-field</h5></td>
+ <td><h5>blob-external-storage-path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Comma-separated BLOB field names to store as serialized
BlobDescriptor bytes inline in data files.</td>
+ <td>The external storage path where raw BLOB data from fields
configured by 'blob-external-storage-field' is written at write time. Orphan
file cleanup is not applied to this path.</td>
</tr>
<tr>
<td><h5>blob-field</h5></td>
@@ -426,7 +426,13 @@ under the License.
<td><h5>data-file.external-paths.strategy</h5></td>
<td style="word-wrap: break-word;">none</td>
<td><p>Enum</p></td>
- <td>The strategy of selecting an external path when writing
data.<br /><br />Possible values:<ul><li>"none": Do not choose any external
storage, data will still be written to the default warehouse
path.</li><li>"specific-fs": Select a specific file system as the external
path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a
new file, a path is chosen from data-file.external-paths in
turn.</li><li>"entropy-inject": When writing a new file, a path is c [...]
+ <td>The strategy of selecting an external path when writing
data.<br /><br />Possible values:<ul><li>"none": Do not choose any external
storage, data will still be written to the default warehouse
path.</li><li>"specific-fs": Select a specific file system as the external
path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a
new file, a path is chosen from data-file.external-paths in
turn.</li><li>"entropy-inject": When writing a new file, a path is c [...]
+ </tr>
+ <tr>
+ <td><h5>data-file.external-paths.weights</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The weights for external paths when
data-file.external-paths.strategy is set to weight-robin. Format:
'weight1,weight2,...' with weights corresponding to paths in
data-file.external-paths by order. Example: '10,5,15' means first path has
weight 10, second 5, third 15. Weights must be positive integers.</td>
</tr>
<tr>
<td><h5>data-file.path-directory</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index aa27621b21..70e50efe9c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -209,6 +209,23 @@ public class CoreOptions implements Serializable {
+ ExternalPathStrategy.SPECIFIC_FS
+ ", should be the prefix scheme of the
external path, now supported are s3 and oss.");
+ public static final ConfigOption<String> DATA_FILE_EXTERNAL_PATHS_WEIGHTS =
+ key("data-file.external-paths.weights")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The weights for external paths when "
+ + DATA_FILE_EXTERNAL_PATHS_STRATEGY.key()
+ + " is set to "
+ + ExternalPathStrategy.WEIGHTED
+ + ". "
+ + "Format: 'weight1,weight2,...' "
+ + "with weights corresponding to paths in "
+ + DATA_FILE_EXTERNAL_PATHS.key()
+ + " by order. "
+ + "Example: '10,5,15' means first path has
weight 10, second 5, third 15. "
+ + "Weights must be positive integers.");
+
public static final ConfigOption<Boolean>
COMPACTION_FORCE_REWRITE_ALL_FILES =
key("compaction.force-rewrite-all-files")
.booleanType()
@@ -3178,6 +3195,21 @@ public class CoreOptions implements Serializable {
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
}
+ @Nullable
+ public int[] externalPathWeights() {
+ String weightsStr = options.get(DATA_FILE_EXTERNAL_PATHS_WEIGHTS);
+ if (weightsStr == null) {
+ return null;
+ }
+ String[] parts = weightsStr.split(",");
+ int[] weights = new int[parts.length];
+ for (int i = 0; i < parts.length; i++) {
+ weights[i] = Integer.parseInt(parts[i].trim());
+ checkArgument(weights[i] > 0, "Weight must be positive, got: %s",
weights[i]);
+ }
+ return weights;
+ }
+
public Boolean forceRewriteAllFiles() {
return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
}
@@ -4088,7 +4120,11 @@ public class CoreOptions implements Serializable {
ENTROPY_INJECT(
"entropy-inject",
- "When writing a new file, a path is chosen based on the hash
value of the file's content.");
+ "When writing a new file, a path is chosen based on the hash
value of the file's content."),
+
+ WEIGHTED(
+ "weight-robin",
+ "When writing a new file, a path is chosen based on configured
weights.");
private final String value;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
index 65fb5deb74..8ad617f8a7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
@@ -32,7 +32,10 @@ public interface ExternalPathProvider extends Serializable {
@Nullable
static ExternalPathProvider create(
- ExternalPathStrategy strategy, List<Path> externalTablePaths, Path
relativeBucketPath) {
+ ExternalPathStrategy strategy,
+ List<Path> externalTablePaths,
+ Path relativeBucketPath,
+ @Nullable int[] weights) {
switch (strategy) {
case ENTROPY_INJECT:
return new EntropyInjectExternalPathProvider(
@@ -41,6 +44,13 @@ public interface ExternalPathProvider extends Serializable {
// specific fs can use round-robin with only one path
case ROUND_ROBIN:
return new RoundRobinExternalPathProvider(externalTablePaths,
relativeBucketPath);
+ case WEIGHTED:
+ if (externalTablePaths.size() < 2 || weights == null) {
+ return new RoundRobinExternalPathProvider(
+ externalTablePaths, relativeBucketPath);
+ }
+ return new WeightedExternalPathProvider(
+ externalTablePaths, relativeBucketPath, weights);
case NONE:
return null;
default:
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/WeightedExternalPathProvider.java
b/paimon-common/src/main/java/org/apache/paimon/fs/WeightedExternalPathProvider.java
new file mode 100644
index 0000000000..e486000845
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/WeightedExternalPathProvider.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Provider for weighted external data paths.
+ *
+ * <p>This provider uses a weighted random algorithm to select paths based on
configured weights.
+ * Higher weights result in higher probability of selection.
+ */
+public class WeightedExternalPathProvider implements ExternalPathProvider {
+
+ private final NavigableMap<Double, Path> cumulativeWeightMap;
+ private final double totalWeight;
+ private final Path relativeBucketPath;
+
+ public WeightedExternalPathProvider(
+ List<Path> externalTablePaths, Path relativeBucketPath, int[]
weights) {
+ checkArgument(
+ externalTablePaths.size() == weights.length,
+ "The number of external paths and weights should be the same.
Paths: "
+ + externalTablePaths.size()
+ + ", Weights: "
+ + weights.length);
+ this.relativeBucketPath = relativeBucketPath;
+ this.cumulativeWeightMap =
buildCumulativeWeightMap(externalTablePaths, weights);
+ this.totalWeight = Arrays.stream(weights).sum();
+ }
+
+ @Override
+ public Path getNextExternalDataPath(String fileName) {
+ double randomValue = ThreadLocalRandom.current().nextDouble() *
totalWeight;
+ Path selectedPath =
cumulativeWeightMap.higherEntry(randomValue).getValue();
+ return new Path(new Path(selectedPath, relativeBucketPath), fileName);
+ }
+
+ private NavigableMap<Double, Path> buildCumulativeWeightMap(
+ List<Path> externalTablePaths, int[] weights) {
+ NavigableMap<Double, Path> map = new TreeMap<>();
+ double cumulativeWeight = 0;
+ for (int i = 0; i < externalTablePaths.size(); i++) {
+ cumulativeWeight += weights[i];
+ map.put(cumulativeWeight, externalTablePaths.get(i));
+ }
+ return map;
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/WeightedExternalPathProviderTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/WeightedExternalPathProviderTest.java
new file mode 100644
index 0000000000..9d9ffb751b
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/WeightedExternalPathProviderTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.CoreOptions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WeightedExternalPathProvider}. */
+public class WeightedExternalPathProviderTest {
+
+ @Test
+ public void testEqualWeights() {
+ int fileNum = 3000;
+ int[] weights = {10, 10, 10};
+ Map<String, Integer> pathCounts = generatePaths(fileNum, weights);
+
+ int expectedCount = fileNum / 3;
+ for (int count : pathCounts.values()) {
+ assertThat(count).isBetween(expectedCount - 100, expectedCount +
100);
+ }
+ }
+
+ @Test
+ public void testDifferentWeights() {
+ int[] weights = {10, 5, 15};
+ int fileNum = 3000;
+ Map<String, Integer> pathCounts = generatePaths(fileNum, weights);
+
+ int totalWeight = 30;
+ assertThat(pathCounts.get("s3://bucket1/data"))
+ .isBetween(
+ (int) (fileNum * 10.0 / totalWeight) - 100,
+ (int) (fileNum * 10.0 / totalWeight) + 100);
+ assertThat(pathCounts.get("oss://bucket2/data"))
+ .isBetween(
+ (int) (fileNum * 5.0 / totalWeight) - 100,
+ (int) (fileNum * 5.0 / totalWeight) + 100);
+ assertThat(pathCounts.get("hdfs://namenode/data"))
+ .isBetween(
+ (int) (fileNum * 15.0 / totalWeight) - 100,
+ (int) (fileNum * 15.0 / totalWeight) + 100);
+ }
+
+ @Test
+ public void testSinglePath() {
+ List<Path> paths = new ArrayList<>();
+ paths.add(new Path("s3://bucket1/data"));
+
+ int[] weights = {10};
+
+ Path relativeBucketPath = new Path("bucket-0");
+ WeightedExternalPathProvider provider =
+ new WeightedExternalPathProvider(paths, relativeBucketPath,
weights);
+
+ for (int fileNum = 0; fileNum < 1000; fileNum++) {
+ Path selectedPath = provider.getNextExternalDataPath("file-" +
fileNum + ".parquet");
+ assertThat(selectedPath.toString())
+ .contains("s3://bucket1/data/bucket-0/file-" + fileNum +
".parquet");
+ }
+ }
+
+ @Test
+ public void testMissingWeight() {
+ List<Path> paths = new ArrayList<>();
+ paths.add(new Path("s3://bucket1/data"));
+ paths.add(new Path("oss://bucket2/data"));
+
+ int[] weights = {10};
+ // Missing weight for oss://bucket2/data
+
+ Path relativeBucketPath = new Path("bucket-0");
+
+ assertThatThrownBy(
+ () -> new WeightedExternalPathProvider(paths,
relativeBucketPath, weights))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "The number of external paths and weights should be
the same. Paths: 2, Weights: 1");
+ }
+
+ @Test
+ public void testPathConstruction() {
+ List<Path> paths = new ArrayList<>();
+ paths.add(new Path("s3://bucket1/data"));
+
+ int[] weights = {10};
+
+ Path relativeBucketPath = new Path("bucket-0");
+ WeightedExternalPathProvider provider =
+ new WeightedExternalPathProvider(paths, relativeBucketPath,
weights);
+
+ Path selectedPath =
provider.getNextExternalDataPath("test-file.parquet");
+ assertThat(selectedPath.toString())
+ .isEqualTo("s3://bucket1/data/bucket-0/test-file.parquet");
+ }
+
+ @Test
+ public void testCreateExternalPathProvider() {
+ ExternalPathProvider provider1 =
+ ExternalPathProvider.create(
+ CoreOptions.ExternalPathStrategy.WEIGHTED,
+ Arrays.asList(new Path("oss://path1"), new
Path("oss://path2")),
+ new Path("bucket-0"),
+ null);
+
assertThat(provider1).isInstanceOf(RoundRobinExternalPathProvider.class);
+
+ ExternalPathProvider provider2 =
+ ExternalPathProvider.create(
+ CoreOptions.ExternalPathStrategy.WEIGHTED,
+ Collections.singletonList(new Path("oss://path1")),
+ new Path("bucket-0"),
+ new int[] {10});
+
assertThat(provider2).isInstanceOf(RoundRobinExternalPathProvider.class);
+
+ ExternalPathProvider provider3 =
+ ExternalPathProvider.create(
+ CoreOptions.ExternalPathStrategy.WEIGHTED,
+ Arrays.asList(new Path("oss://path1"), new
Path("oss://path2")),
+ new Path("bucket-0"),
+ new int[] {10, 20});
+ assertThat(provider3).isInstanceOf(WeightedExternalPathProvider.class);
+ }
+
+ private Map<String, Integer> generatePaths(int fileNum, int[] weights) {
+ List<Path> paths = new ArrayList<>();
+ paths.add(new Path("s3://bucket1/data"));
+ paths.add(new Path("oss://bucket2/data"));
+ paths.add(new Path("hdfs://namenode/data"));
+
+ Path relativeBucketPath = new Path("bucket-0");
+ WeightedExternalPathProvider provider =
+ new WeightedExternalPathProvider(paths, relativeBucketPath,
weights);
+
+ Map<String, Integer> pathCounts = new HashMap<>();
+ for (int i = 0; i < fileNum; i++) {
+ Path selectedPath = provider.getNextExternalDataPath("file-" + i +
".parquet");
+ String basePath = selectedPath.getParent().getParent().toString();
+ pathCounts.put(basePath, pathCounts.getOrDefault(basePath, 0) + 1);
+ }
+ return pathCounts;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 1abef11a2c..85e34319ed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -143,6 +143,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.dataFilePathDirectory(),
createExternalPaths(),
options.externalPathStrategy(),
+ options.externalPathWeights(),
options.indexFileInDataFileDir(),
options.globalIndexExternalPath());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
index c241eb7d18..23061e0d14 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
@@ -69,6 +69,7 @@ public class FormatTableFileWriter {
options.dataFilePathDirectory(),
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
options.indexFileInDataFileDir(),
null);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 6ebe6438ed..e2c99b52c5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -83,6 +83,7 @@ public class FileStorePathFactory {
private final AtomicInteger statsFileCount;
private final List<Path> externalPaths;
private final ExternalPathStrategy strategy;
+ @Nullable private final int[] externalPathWeights;
@Nullable private final Path globalIndexExternalRootDir;
public FileStorePathFactory(
@@ -98,6 +99,7 @@ public class FileStorePathFactory {
@Nullable String dataFilePathDirectory,
List<Path> externalPaths,
ExternalPathStrategy strategy,
+ @Nullable int[] externalPathWeights,
boolean indexFileInDataFileDir,
@Nullable Path globalIndexExternalRootDir) {
this.root = root;
@@ -120,6 +122,7 @@ public class FileStorePathFactory {
this.statsFileCount = new AtomicInteger(0);
this.externalPaths = externalPaths;
this.strategy = strategy;
+ this.externalPathWeights = externalPathWeights;
this.globalIndexExternalRootDir = globalIndexExternalRootDir;
}
@@ -215,7 +218,10 @@ public class FileStorePathFactory {
return null;
}
return ExternalPathProvider.create(
- strategy, externalPaths, relativeBucketPath(partition,
bucket));
+ strategy,
+ externalPaths,
+ relativeBucketPath(partition, bucket),
+ externalPathWeights);
}
public List<Path> getExternalPaths() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
index 1739a27154..e02a04b063 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
@@ -173,6 +173,7 @@ public class DataFileIndexWriterTest {
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
false,
null);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index e350cb2a9a..251e0b17e7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -327,6 +327,7 @@ public class KeyValueFileReadWriteTest {
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
false,
null);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 4aa696a46a..a75412c6fb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -156,6 +156,7 @@ public abstract class ManifestFileMetaTestBase {
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
false,
null),
Long.MAX_VALUE,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index dbf11aa8ff..a10d325d84 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -144,6 +144,7 @@ public class ManifestFileTest {
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
false,
null);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) +
1024;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index a4b3ceec5a..2b945f87b4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -176,6 +176,7 @@ public class ManifestListTest {
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
false,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
index 146a62cae6..4f067163fc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
@@ -95,6 +95,7 @@ public class FileStorePathFactoryTest {
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
false,
null);
@@ -141,6 +142,7 @@ public class FileStorePathFactoryTest {
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
false,
null);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 6dc41981bf..13e0825f69 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -34,7 +34,10 @@ import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -225,6 +228,49 @@ public class AppendOnlyTableITCase extends
CatalogITCaseBase {
Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC"),
Row.of(4, "DDD"));
}
+ @Test
+ public void testReadWriteWithExternalPathWeightRobinStrategy() throws
IOException {
+ String externalPaths =
+ TraceableFileIO.SCHEME
+ + "://"
+ + tempExternalPath1.toString()
+ + ","
+ + LocalFileIOLoader.SCHEME
+ + "://"
+ + tempExternalPath2.toString();
+ batchSql(
+ "ALTER TABLE append_table SET ("
+ + "'data-file.external-paths' = '"
+ + externalPaths
+ + "', "
+ + "'data-file.external-paths.strategy' =
'weight-robin', "
+ + "'data-file.external-paths.weights' = '1,3', "
+ + "'write-only' = 'true'"
+ + ")");
+
+ int fileNum = 50;
+ for (int i = 1; i <= fileNum; i++) {
+ batchSql("INSERT INTO append_table VALUES (" + i + ", 'AAA')");
+ }
+
+ List<Row> rows = batchSql("SELECT * FROM append_table");
+ assertThat(rows.size()).isEqualTo(fileNum);
+
+ // Verify file distribution based on weights
+ long filesInPath1 =
+ Files.list(Paths.get(tempExternalPath1.toString() +
"/bucket-0")).count();
+ long filesInPath2 =
+ Files.list(Paths.get(tempExternalPath2.toString() +
"/bucket-0")).count();
+ long totalFiles = filesInPath1 + filesInPath2;
+
+ // Since the file sample size is small in IT case, we only verify that
higher-weighted path
+ // has more files
+ assertThat(filesInPath1).isGreaterThan(0);
+ assertThat(filesInPath2).isGreaterThan(0);
+ assertThat(filesInPath2).isGreaterThan(filesInPath1);
+ assertThat(totalFiles).isEqualTo(fileNum);
+ }
+
@Test
public void testReadWriteWithExternalPathNoneStrategy() {
String externalPaths = TraceableFileIO.SCHEME + "://" +
tempExternalPath1.toString();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index d98f277a55..5883c51716 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -306,6 +307,63 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
assertThat(actual).containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]",
"+I[3, C]");
}
+ @Test
+ public void testTableReadWriteWithExternalPathWeightRobin() throws
Exception {
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+
.checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
+ .parallelism(1)
+ .build();
+
+ sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ sEnv.executeSql("USE CATALOG testCatalog");
+ String externalPaths =
+ TraceableFileIO.SCHEME
+ + "://"
+ + externalPath1.toString()
+ + ","
+ + LocalFileIOLoader.SCHEME
+ + "://"
+ + externalPath2.toString();
+ sEnv.executeSql(
+ "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT
ENFORCED ) "
+ + "WITH ( "
+ + "'bucket' = '1',"
+ + "'write-only' = 'true',"
+ + "'data-file.external-paths' = '"
+ + externalPaths
+ + "',"
+ + "'data-file.external-paths.strategy' =
'weight-robin',"
+ + "'data-file.external-paths.weights' = '10,5'"
+ + ")");
+
+ CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM
T2"));
+
+ int fileNum = 50;
+ for (int i = 1; i <= fileNum; i++) {
+ sEnv.executeSql("INSERT INTO T2 VALUES (" + i + ", 'data" + i +
"')").await();
+ }
+
+ List<String> actual = new ArrayList<>();
+ for (int i = 0; i < fileNum; i++) {
+ actual.add(it.next().toString());
+ }
+ // Verify all data is readable
+ assertThat(actual).hasSize(fileNum);
+
+ long filesInPath1 = Files.list(Paths.get(externalPath1.toString() +
"/bucket-0")).count();
+ long filesInPath2 = Files.list(Paths.get(externalPath2.toString() +
"/bucket-0")).count();
+ long totalFiles = filesInPath1 + filesInPath2;
+
+ // Since the file sample size is small in IT case, we only verify that
higher-weighted path
+ // has more files
+ assertThat(filesInPath1).isGreaterThan(0);
+ assertThat(filesInPath2).isGreaterThan(0);
+ assertThat(filesInPath1).isGreaterThan(filesInPath2);
+ assertThat(totalFiles).isEqualTo(fileNum);
+ }
+
@Test
public void testDropTableWithExternalPaths() throws Exception {
TableEnvironment sEnv =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 23e837d31d..8114ac17eb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -113,6 +113,7 @@ public class TestChangelogDataReadWrite {
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
false,
null);
this.snapshotManager = newSnapshotManager(LocalFileIO.create(), new
Path(root));
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 20d78a76c3..ed3c53683b 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -211,6 +211,7 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
+ null,
false,
null);