This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5cf23382ec [core] Refactor ExternalPathProvider to generate file in
DataFilePathFactory
5cf23382ec is described below
commit 5cf23382ecf6f09d2662c26f0f3729658b2b959d
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jan 6 16:48:38 2025 +0800
[core] Refactor ExternalPathProvider to generate file in DataFilePathFactory
---
.../main/java/org/apache/paimon/CoreOptions.java | 1 -
.../paimon/fs/DataFileExternalPathProvider.java | 82 ---------
.../org/apache/paimon/fs/ExternalPathProvider.java | 51 ++++++
.../paimon/fs/TableExternalPathProvider.java | 192 --------------------
.../paimon/fs/TableExternalPathProviderTest.java | 194 ---------------------
.../java/org/apache/paimon/AbstractFileStore.java | 42 ++++-
.../iceberg/manifest/IcebergManifestFile.java | 1 -
.../org/apache/paimon/io/DataFilePathFactory.java | 25 ++-
.../apache/paimon/io/KeyValueDataFileWriter.java | 5 +-
.../paimon/io/KeyValueFileWriterFactory.java | 25 +--
.../org/apache/paimon/io/RowDataFileWriter.java | 5 +-
.../org/apache/paimon/io/SingleFileWriter.java | 5 +-
.../paimon/io/StatsCollectingSingleFileWriter.java | 5 +-
.../org/apache/paimon/manifest/ManifestFile.java | 1 -
.../apache/paimon/utils/FileStorePathFactory.java | 22 +--
15 files changed, 130 insertions(+), 526 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8c59cdcd41..002def9f44 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2213,7 +2213,6 @@ public class CoreOptions implements Serializable {
return options.get(DATA_FILE_EXTERNAL_PATHS);
}
- @Nullable
public ExternalPathStrategy externalPathStrategy() {
return options.get(DATA_FILE_EXTERNAL_PATHS_STRATEGY);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/DataFileExternalPathProvider.java
b/paimon-common/src/main/java/org/apache/paimon/fs/DataFileExternalPathProvider.java
deleted file mode 100644
index 71e6f78184..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/fs/DataFileExternalPathProvider.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.fs;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.Objects;
-import java.util.Optional;
-
-/** Provider for external data paths. */
-public class DataFileExternalPathProvider implements Serializable {
- @Nullable private final TableExternalPathProvider
tableExternalPathProvider;
- private final Path relativeBucketPath;
-
- public DataFileExternalPathProvider(
- @Nullable TableExternalPathProvider tableExternalPathProvider,
- Path relativeBucketPath) {
- this.tableExternalPathProvider = tableExternalPathProvider;
- this.relativeBucketPath = relativeBucketPath;
- }
-
- /**
- * Get the next external data path.
- *
- * @return the next external data path
- */
- public Optional<Path> getNextExternalDataPath() {
- return Optional.ofNullable(tableExternalPathProvider)
- .flatMap(TableExternalPathProvider::getNextExternalPath)
- .map(path -> new Path(path, relativeBucketPath));
- }
-
- public boolean externalPathExists() {
- return tableExternalPathProvider != null &&
tableExternalPathProvider.externalPathExists();
- }
-
- @Override
- public final boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof DataFileExternalPathProvider)) {
- return false;
- }
-
- DataFileExternalPathProvider that = (DataFileExternalPathProvider) o;
- return Objects.equals(tableExternalPathProvider,
that.tableExternalPathProvider)
- && Objects.equals(relativeBucketPath, that.relativeBucketPath);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tableExternalPathProvider, relativeBucketPath);
- }
-
- @Override
- public String toString() {
- return "DataFileExternalPathProvider{"
- + " externalPathProvider="
- + tableExternalPathProvider
- + ", relativeBucketPath="
- + relativeBucketPath
- + "}";
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
new file mode 100644
index 0000000000..720773d64f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Provider for external data paths. */
+public class ExternalPathProvider implements Serializable {
+
+ private final List<Path> externalTablePaths;
+ private final Path relativeBucketPath;
+
+ private int position;
+
+ public ExternalPathProvider(List<Path> externalTablePaths, Path
relativeBucketPath) {
+ this.externalTablePaths = externalTablePaths;
+ this.relativeBucketPath = relativeBucketPath;
+ this.position =
ThreadLocalRandom.current().nextInt(externalTablePaths.size());
+ }
+
+ /**
+ * Get the next external data path.
+ *
+ * @return the next external data path
+ */
+ public Path getNextExternalDataPath(String fileName) {
+ position++;
+ if (position == externalTablePaths.size()) {
+ position = 0;
+ }
+ return new Path(new Path(externalTablePaths.get(position),
relativeBucketPath), fileName);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/TableExternalPathProvider.java
b/paimon-common/src/main/java/org/apache/paimon/fs/TableExternalPathProvider.java
deleted file mode 100644
index 8ae94e22b7..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/fs/TableExternalPathProvider.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.fs;
-
-import org.apache.paimon.CoreOptions.ExternalPathStrategy;
-import org.apache.paimon.annotation.VisibleForTesting;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Random;
-
-/** Provider for external paths. */
-public class TableExternalPathProvider implements Serializable {
- private final Map<String, Path> externalPathsMap;
- private final List<Path> externalPathsList;
-
- private final ExternalPathStrategy externalPathStrategy;
- private final String externalSpecificFS;
- private int currentIndex = 0;
- private boolean externalPathExists;
-
- public TableExternalPathProvider(
- String externalPaths,
- ExternalPathStrategy externalPathStrategy,
- String externalSpecificFS) {
- this.externalPathsMap = new HashMap<>();
- this.externalPathsList = new ArrayList<>();
- this.externalPathStrategy = externalPathStrategy;
- if (externalSpecificFS != null) {
- this.externalSpecificFS = externalSpecificFS.toLowerCase();
- } else {
- this.externalSpecificFS = null;
- }
- initExternalPaths(externalPaths);
- if (!externalPathsList.isEmpty()) {
- this.currentIndex = new Random().nextInt(externalPathsList.size());
- }
- }
-
- private void initExternalPaths(String externalPaths) {
- if (externalPaths == null) {
- return;
- }
-
- String[] tmpArray = externalPaths.split(",");
- for (String s : tmpArray) {
- Path path = new Path(s.trim());
- String scheme = path.toUri().getScheme();
- if (scheme == null) {
- throw new IllegalArgumentException("scheme should not be null:
" + path);
- }
- scheme = scheme.toLowerCase();
- externalPathsMap.put(scheme, path);
- externalPathsList.add(path);
- }
-
- if (externalPathStrategy != null
- &&
externalPathStrategy.equals(ExternalPathStrategy.SPECIFIC_FS)) {
- if (externalSpecificFS == null) {
- throw new IllegalArgumentException("external specific fs
should not be null: ");
- }
-
- if (!externalPathsMap.containsKey(externalSpecificFS)) {
- throw new IllegalArgumentException(
- "external specific fs not found: " +
externalSpecificFS);
- }
- }
-
- if (!externalPathsMap.isEmpty()
- && !externalPathsList.isEmpty()
- && externalPathStrategy != ExternalPathStrategy.NONE) {
- externalPathExists = true;
- }
- }
-
- /**
- * Get the next external path.
- *
- * @return the next external path
- */
- public Optional<Path> getNextExternalPath() {
- if (externalPathsMap == null || externalPathsMap.isEmpty()) {
- return Optional.empty();
- }
-
- switch (externalPathStrategy) {
- case NONE:
- return Optional.empty();
- case SPECIFIC_FS:
- return getSpecificFSExternalPath();
- case ROUND_ROBIN:
- return getRoundRobinPath();
- default:
- return Optional.empty();
- }
- }
-
- private Optional<Path> getSpecificFSExternalPath() {
- if (!externalPathsMap.containsKey(externalSpecificFS)) {
- return Optional.empty();
- }
- return Optional.of(externalPathsMap.get(externalSpecificFS));
- }
-
- private Optional<Path> getRoundRobinPath() {
- currentIndex = (currentIndex + 1) % externalPathsList.size();
- return Optional.of(externalPathsList.get(currentIndex));
- }
-
- public boolean externalPathExists() {
- return externalPathExists;
- }
-
- @VisibleForTesting
- public Map<String, Path> getExternalPathsMap() {
- return externalPathsMap;
- }
-
- @VisibleForTesting
- public List<Path> getExternalPathsList() {
- return externalPathsList;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TableExternalPathProvider that = (TableExternalPathProvider) o;
- return currentIndex == that.currentIndex
- && externalPathExists == that.externalPathExists
- && externalPathsMap.equals(that.externalPathsMap)
- && externalPathsList.equals(that.externalPathsList)
- && externalPathStrategy == that.externalPathStrategy
- && Objects.equals(externalSpecificFS, that.externalSpecificFS);
- }
-
- @Override
- public String toString() {
- return "ExternalPathProvider{"
- + " externalPathsMap="
- + externalPathsMap
- + ", externalPathsList="
- + externalPathsList
- + ", externalPathStrategy="
- + externalPathStrategy
- + ", externalSpecificFS='"
- + externalSpecificFS
- + '\''
- + ", currentIndex="
- + currentIndex
- + ", externalPathExists="
- + externalPathExists
- + "}";
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- externalPathsMap,
- externalPathsList,
- externalPathStrategy,
- externalSpecificFS,
- currentIndex,
- externalPathExists);
- }
-}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/TableExternalPathProviderTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/TableExternalPathProviderTest.java
deleted file mode 100644
index 0a444768cf..0000000000
---
a/paimon-common/src/test/java/org/apache/paimon/fs/TableExternalPathProviderTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.fs;
-
-import org.apache.paimon.CoreOptions.ExternalPathStrategy;
-
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Test for {@link TableExternalPathProvider}. */
-public class TableExternalPathProviderTest {
- private TableExternalPathProvider provider;
-
- @BeforeEach
- public void setUp() {
- provider =
- new TableExternalPathProvider(
- "oss://bucket1/path1,s3://bucket2/path2",
- ExternalPathStrategy.ROUND_ROBIN,
- null);
- }
-
- @Test
- public void testInitExternalPaths() {
- assertThat(provider.externalPathExists()).isTrue();
- assertThat(provider.getExternalPathsMap().size()).isEqualTo(2);
- assertThat(provider.getExternalPathsList().size()).isEqualTo(2);
- }
-
- @Test
- public void testGetNextExternalPathRoundRobinSimple() {
- String path1 = "s3://bucket2/path2";
- String path2 = "oss://bucket1/path1";
- List<String> expectedPaths = new ArrayList<String>();
- expectedPaths.add(path1);
- expectedPaths.add(path2);
- String externalPaths = path1 + "," + path2;
- provider =
- new TableExternalPathProvider(
- externalPaths, ExternalPathStrategy.ROUND_ROBIN, null);
-
- // Collect the paths returned by getNextExternalPath
- List<String> actualPaths = new ArrayList<>();
- int expectIndex = 0;
- for (int i = 0; i < 6; i++) { // Collect more paths to ensure all are
covered
- Optional<Path> path = provider.getNextExternalPath();
- assertThat(path.isPresent()).isTrue();
- actualPaths.add(path.get().toString());
- if (i == 0) {
- if (path.get().toString().equals(path1)) {
- expectIndex = 0;
- } else if (path.get().toString().equals(path2)) {
- expectIndex = 1;
- }
- }
-
- expectIndex = (expectIndex) % expectedPaths.size();
-
assertThat(path.get().toString().equals(expectedPaths.get(expectIndex))).isTrue();
- expectIndex++;
- }
-
- // Check that all expected paths are present in the actual paths
- for (String expectedPath : expectedPaths) {
- assertThat(actualPaths).contains(expectedPath);
- }
- }
-
- @Test
- public void testGetNextExternalPathRoundRobinComplex() {
- List<String> expectedPathsList = new ArrayList<String>();
- for (int i = 0; i < 20; i++) {
- if (i % 2 == 0) {
- expectedPathsList.add("oss://bucket1/path" + i);
- } else {
- expectedPathsList.add("s3://bucket2/path" + i);
- }
- }
- String externalPaths = String.join(",", expectedPathsList);
- provider =
- new TableExternalPathProvider(
- externalPaths, ExternalPathStrategy.ROUND_ROBIN, null);
-
- // Collect the paths returned by getNextExternalPath
- List<String> actualPaths = new ArrayList<>();
- int expectIndex = 0;
- for (int i = 0; i < 40; i++) { // Collect more paths to ensure all are
covered
- Optional<Path> path = provider.getNextExternalPath();
- assertThat(path.isPresent()).isTrue();
- actualPaths.add(path.get().toString());
- if (i == 0) {
- for (int j = 0; j < expectedPathsList.size(); j++) {
- if
(path.get().toString().equals(expectedPathsList.get(j))) {
- expectIndex = j;
- break;
- }
- }
- }
- expectIndex = (expectIndex) % expectedPathsList.size();
-
assertThat(path.get().toString().equals(expectedPathsList.get(expectIndex))).isTrue();
- expectIndex++;
- }
-
- // Check that all expected paths are present in the actual paths
- for (String expectedPath : expectedPathsList) {
- assertThat(actualPaths).contains(expectedPath);
- }
- }
-
- @Test
- public void testGetNextExternalPathSpecificFS() {
- provider =
- new TableExternalPathProvider(
- "oss://bucket1/path1,s3://bucket2/path2",
- ExternalPathStrategy.SPECIFIC_FS,
- "OSS");
-
- Optional<Path> path = provider.getNextExternalPath();
- assertThat(path.isPresent()).isTrue();
- assertThat(path.get().toString()).isEqualTo("oss://bucket1/path1");
- }
-
- @Test
- public void testGetNextExternalPathNone() {
- provider =
- new TableExternalPathProvider(
- "oss://bucket1/path1,s3://bucket2/path2",
ExternalPathStrategy.NONE, "OSS");
-
- Optional<Path> path = provider.getNextExternalPath();
- assertThat(path.isPresent()).isFalse();
- }
-
- @Test
- public void testUnsupportedExternalPath() {
- Assertions.assertThatThrownBy(
- () -> {
- new TableExternalPathProvider(
- "hdfs://bucket1/path1",
- ExternalPathStrategy.SPECIFIC_FS,
- "oss");
- })
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void testUnsupportedExternalSpecificFS() {
- assertThatThrownBy(
- () -> {
- provider =
- new TableExternalPathProvider(
- "oss://bucket1/path1",
- ExternalPathStrategy.SPECIFIC_FS,
- "S3");
- })
- .satisfies(
- anyCauseMatches(
- IllegalArgumentException.class,
- "external specific fs not found: s3"));
- }
-
- @Test
- public void testExternalSpecificFSNull() {
- Assertions.assertThatThrownBy(
- () -> {
- new TableExternalPathProvider(
- "oss://bucket1/path1",
ExternalPathStrategy.SPECIFIC_FS, null);
- })
- .isInstanceOf(IllegalArgumentException.class);
- }
-}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 655d75edfa..8b24ec2a54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.TableExternalPathProvider;
import org.apache.paimon.index.HashIndexFile;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
@@ -66,6 +65,8 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/**
* Base {@link FileStore} implementation.
*
@@ -122,15 +123,42 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
options.fileSuffixIncludeCompression(),
options.fileCompression(),
options.dataFilePathDirectory(),
- getExternalPathProvider());
+ createExternalPaths());
}
- private TableExternalPathProvider getExternalPathProvider() {
+ private List<Path> createExternalPaths() {
String externalPaths = options.dataFileExternalPaths();
- ExternalPathStrategy externalPathStrategy =
options.externalPathStrategy();
- String externalSpecificFS = options.externalSpecificFS();
- return new TableExternalPathProvider(
- externalPaths, externalPathStrategy, externalSpecificFS);
+ ExternalPathStrategy strategy = options.externalPathStrategy();
+ if (externalPaths == null
+ || externalPaths.isEmpty()
+ || strategy == ExternalPathStrategy.NONE) {
+ return Collections.emptyList();
+ }
+
+ String specificFS = options.externalSpecificFS();
+
+ List<Path> paths = new ArrayList<>();
+ for (String pathString : externalPaths.split(",")) {
+ Path path = new Path(pathString.trim());
+ String scheme = path.toUri().getScheme();
+ if (scheme == null) {
+ throw new IllegalArgumentException("scheme should not be null:
" + path);
+ }
+
+ if (strategy == ExternalPathStrategy.SPECIFIC_FS) {
+ checkArgument(
+ specificFS != null,
+ "External path specificFS should not be null when
strategy is specificFS.");
+ if (scheme.equalsIgnoreCase(specificFS)) {
+ paths.add(path);
+ }
+ } else {
+ paths.add(path);
+ }
+ }
+
+ checkArgument(!paths.isEmpty(), "External paths should not be empty");
+ return paths;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index d4cfc0b5ec..5955da6220 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -154,7 +154,6 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
path,
serializer::toRow,
fileCompression,
- false,
false);
this.partitionStatsCollector = new
SimpleStatsCollector(partitionType);
this.sequenceNumber = sequenceNumber;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index be32afa644..a70a795ef0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -19,7 +19,7 @@
package org.apache.paimon.io;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.fs.DataFileExternalPathProvider;
+import org.apache.paimon.fs.ExternalPathProvider;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileEntry;
@@ -45,8 +45,7 @@ public class DataFilePathFactory {
private final String changelogFilePrefix;
private final boolean fileSuffixIncludeCompression;
private final String fileCompression;
- @Nullable private final DataFileExternalPathProvider
dataFileExternalPathProvider;
- private final boolean isExternalPath;
+ @Nullable private final ExternalPathProvider externalPathProvider;
public DataFilePathFactory(
Path parent,
@@ -55,7 +54,7 @@ public class DataFilePathFactory {
String changelogFilePrefix,
boolean fileSuffixIncludeCompression,
String fileCompression,
- @Nullable DataFileExternalPathProvider
dataFileExternalPathProvider) {
+ @Nullable ExternalPathProvider externalPathProvider) {
this.parent = parent;
this.uuid = UUID.randomUUID().toString();
this.pathCount = new AtomicInteger(0);
@@ -64,12 +63,7 @@ public class DataFilePathFactory {
this.changelogFilePrefix = changelogFilePrefix;
this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
this.fileCompression = fileCompression;
- this.dataFileExternalPathProvider = dataFileExternalPathProvider;
- if (dataFileExternalPathProvider != null) {
- this.isExternalPath =
dataFileExternalPathProvider.externalPathExists();
- } else {
- this.isExternalPath = false;
- }
+ this.externalPathProvider = externalPathProvider;
}
public Path newPath() {
@@ -85,10 +79,11 @@ public class DataFilePathFactory {
}
public Path newPath(String prefix) {
- return Optional.ofNullable(dataFileExternalPathProvider)
- .flatMap(DataFileExternalPathProvider::getNextExternalDataPath)
- .map(path -> new Path(path, newFileName(prefix)))
- .orElseGet(() -> new Path(parent, newFileName(prefix)));
+ String fileName = newFileName(prefix);
+ if (externalPathProvider != null) {
+ return externalPathProvider.getNextExternalDataPath(fileName);
+ }
+ return new Path(parent, fileName);
}
private String newFileName(String prefix) {
@@ -148,7 +143,7 @@ public class DataFilePathFactory {
}
public boolean isExternalPath() {
- return isExternalPath;
+ return externalPathProvider != null;
}
@VisibleForTesting
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 359855260e..3c7f6b45bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -66,6 +66,7 @@ public abstract class KeyValueDataFileWriter
private final int level;
private final SimpleStatsConverter keyStatsConverter;
+ private final boolean isExternalPath;
private final SimpleStatsConverter valueStatsConverter;
private final InternalRowSerializer keySerializer;
private final FileSource fileSource;
@@ -103,8 +104,7 @@ public abstract class KeyValueDataFileWriter
compression,
StatsCollectorFactories.createStatsFactories(
options, writeRowType.getFieldNames(),
keyType.getFieldNames()),
- options.asyncFileWrite(),
- isExternalPath);
+ options.asyncFileWrite());
this.keyType = keyType;
this.valueType = valueType;
@@ -112,6 +112,7 @@ public abstract class KeyValueDataFileWriter
this.level = level;
this.keyStatsConverter = new SimpleStatsConverter(keyType);
+ this.isExternalPath = isExternalPath;
this.valueStatsConverter = new SimpleStatsConverter(valueType,
options.statsDenseStore());
this.keySerializer = new InternalRowSerializer(keyType);
this.fileSource = fileSource;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index feee724389..53d7dcb68d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -93,23 +93,24 @@ public class KeyValueFileWriterFactory {
public RollingFileWriter<KeyValue, DataFileMeta>
createRollingMergeTreeFileWriter(
int level, FileSource fileSource) {
return new RollingFileWriter<>(
- () ->
- createDataFileWriter(
- formatContext.pathFactory(level).newPath(),
- level,
- fileSource,
-
formatContext.pathFactory(level).isExternalPath()),
+ () -> {
+ DataFilePathFactory pathFactory =
formatContext.pathFactory(level);
+ return createDataFileWriter(
+ pathFactory.newPath(), level, fileSource,
pathFactory.isExternalPath());
+ },
suggestedFileSize);
}
public RollingFileWriter<KeyValue, DataFileMeta>
createRollingChangelogFileWriter(int level) {
return new RollingFileWriter<>(
- () ->
- createDataFileWriter(
-
formatContext.pathFactory(level).newChangelogPath(),
- level,
- FileSource.APPEND,
-
formatContext.pathFactory(level).isExternalPath()),
+ () -> {
+ DataFilePathFactory pathFactory =
formatContext.pathFactory(level);
+ return createDataFileWriter(
+ pathFactory.newChangelogPath(),
+ level,
+ FileSource.APPEND,
+ pathFactory.isExternalPath());
+ },
suggestedFileSize);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 97af4d067a..25906e2dfa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -49,6 +49,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
private final long schemaId;
private final LongCounter seqNumCounter;
+ private final boolean isExternalPath;
private final SimpleStatsConverter statsArraySerializer;
@Nullable private final DataFileIndexWriter dataFileIndexWriter;
private final FileSource fileSource;
@@ -77,10 +78,10 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
simpleStatsExtractor,
fileCompression,
statsCollectors,
- asyncFileWrite,
- isExternalPath);
+ asyncFileWrite);
this.schemaId = schemaId;
this.seqNumCounter = seqNumCounter;
+ this.isExternalPath = isExternalPath;
this.statsArraySerializer = new SimpleStatsConverter(writeSchema,
statsDenseStore);
this.dataFileIndexWriter =
DataFileIndexWriter.create(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index 7a3cc4143b..f303e85978 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -54,7 +54,6 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
private long recordCount;
protected boolean closed;
- protected boolean isExternalPath;
public SingleFileWriter(
FileIO fileIO,
@@ -62,8 +61,7 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
Path path,
Function<T, InternalRow> converter,
String compression,
- boolean asyncWrite,
- boolean isExternalPath) {
+ boolean asyncWrite) {
this.fileIO = fileIO;
this.path = path;
this.converter = converter;
@@ -86,7 +84,6 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
this.recordCount = 0;
this.closed = false;
- this.isExternalPath = isExternalPath;
}
public Path path() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index 0ab2de8c2b..67a3fa6d1a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -59,9 +59,8 @@ public abstract class StatsCollectingSingleFileWriter<T, R>
extends SingleFileWr
@Nullable SimpleStatsExtractor simpleStatsExtractor,
String compression,
SimpleColStatsCollector.Factory[] statsCollectors,
- boolean asyncWrite,
- boolean isExternalPath) {
- super(fileIO, factory, path, converter, compression, asyncWrite,
isExternalPath);
+ boolean asyncWrite) {
+ super(fileIO, factory, path, converter, compression, asyncWrite);
this.simpleStatsExtractor = simpleStatsExtractor;
if (this.simpleStatsExtractor == null) {
this.simpleStatsCollector = new SimpleStatsCollector(writeSchema,
statsCollectors);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 34bf77345a..61b3e8a517 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -132,7 +132,6 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
path,
serializer::toRow,
fileCompression,
- false,
false);
this.partitionStatsCollector = new
SimpleStatsCollector(partitionType);
this.partitionStatsSerializer = new
SimpleStatsConverter(partitionType);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 7a9a7c4dbe..81b6307a5f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -20,9 +20,8 @@ package org.apache.paimon.utils;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.fs.DataFileExternalPathProvider;
+import org.apache.paimon.fs.ExternalPathProvider;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.TableExternalPathProvider;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.types.RowType;
@@ -57,7 +56,7 @@ public class FileStorePathFactory {
private final AtomicInteger indexManifestCount;
private final AtomicInteger indexFileCount;
private final AtomicInteger statsFileCount;
- @Nullable private final TableExternalPathProvider
tableExternalPathProvider;
+ private final List<Path> externalPaths;
public FileStorePathFactory(
Path root,
@@ -70,7 +69,7 @@ public class FileStorePathFactory {
boolean fileSuffixIncludeCompression,
String fileCompression,
@Nullable String dataFilePathDirectory,
- @Nullable TableExternalPathProvider tableExternalPathProvider) {
+ List<Path> externalPaths) {
this.root = root;
this.dataFilePathDirectory = dataFilePathDirectory;
this.uuid = UUID.randomUUID().toString();
@@ -88,7 +87,7 @@ public class FileStorePathFactory {
this.indexManifestCount = new AtomicInteger(0);
this.indexFileCount = new AtomicInteger(0);
this.statsFileCount = new AtomicInteger(0);
- this.tableExternalPathProvider = tableExternalPathProvider;
+ this.externalPaths = externalPaths;
}
public Path root() {
@@ -133,13 +132,16 @@ public class FileStorePathFactory {
changelogFilePrefix,
fileSuffixIncludeCompression,
fileCompression,
- getDataFileExternalPathProvider(
- tableExternalPathProvider,
relativeBucketPath(partition, bucket)));
+ createExternalPathProvider(partition, bucket));
}
- private DataFileExternalPathProvider getDataFileExternalPathProvider(
- TableExternalPathProvider tableExternalPathProvider, Path
relativeBucketPath) {
- return new DataFileExternalPathProvider(tableExternalPathProvider,
relativeBucketPath);
+ @Nullable
+ private ExternalPathProvider createExternalPathProvider(BinaryRow
partition, int bucket) {
+ if (externalPaths == null || externalPaths.isEmpty()) {
+ return null;
+ }
+
+ return new ExternalPathProvider(externalPaths,
relativeBucketPath(partition, bucket));
}
public Path bucketPath(BinaryRow partition, int bucket) {