This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e483e467163 add PinotFS support for streamed untar segment download
(#17586)
e483e467163 is described below
commit e483e467163f60f33b6660cd0f0e76d4d79f95d7
Author: mluvin-stripe <[email protected]>
AuthorDate: Tue Mar 3 16:07:47 2026 -0500
add PinotFS support for streamed untar segment download (#17586)
---
.../utils/fetcher/PinotFSSegmentFetcher.java | 40 ++++
.../utils/fetcher/PinotFSSegmentFetcherTest.java | 229 +++++++++++++++++++++
2 files changed, 269 insertions(+)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
index c11a0239c10..c0c063a9e14 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
@@ -19,8 +19,17 @@
package org.apache.pinot.common.utils.fetcher;
import java.io.File;
+import java.io.InputStream;
import java.net.URI;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
public class PinotFSSegmentFetcher extends BaseSegmentFetcher {
@@ -34,4 +43,35 @@ public class PinotFSSegmentFetcher extends
BaseSegmentFetcher {
PinotFSFactory.create(uri.getScheme()).copyToLocalFile(uri, dest);
}
}
+
+ @Override
+ public File fetchUntarSegmentToLocalStreamed(URI uri, File dest, long
rateLimit, AtomicInteger attempts)
+ throws Exception {
+ PinotFS pinotFS;
+ if (uri.getScheme() == null) {
+ pinotFS = PinotFSFactory.create(PinotFSFactory.LOCAL_PINOT_FS_SCHEME);
+ } else {
+ pinotFS = PinotFSFactory.create(uri.getScheme());
+ }
+ AtomicReference<File> untarredFileRef = new AtomicReference<>();
+
+ try {
+ int tries =
+ RetryPolicies.exponentialBackoffRetryPolicy(_retryCount,
_retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
+ try (InputStream inputStream = pinotFS.open(uri)) {
+ List<File> untarredFiles =
TarCompressionUtils.untarWithRateLimiter(inputStream, dest, rateLimit);
+ untarredFileRef.set(untarredFiles.get(0));
+ return true;
+ } catch (Exception e) {
+ _logger.warn("Caught exception while downloading segment from:
{} to: {}", uri, dest, e);
+ return false;
+ }
+ });
+ attempts.set(tries);
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ attempts.set(e.getAttempts());
+ throw e;
+ }
+ return untarredFileRef.get();
+ }
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcherTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcherTest.java
new file mode 100644
index 00000000000..950959ece7c
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcherTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class PinotFSSegmentFetcherTest {
+ private static final String SEGMENT_NAME = "testSegment";
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
+ PinotFSSegmentFetcherTest.class.getName());
+ private static final File DATA_DIR = new File(TEMP_DIR, "dataDir");
+ private static final File TAR_DIR = new File(TEMP_DIR, "tarDir");
+ private static final File DOWNLOAD_DIR = new File(TEMP_DIR, "downloadDir");
+
+ private PinotFSSegmentFetcher _segmentFetcher;
+
+ @BeforeMethod
+ public void setUp()
+ throws IOException {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ FileUtils.forceMkdir(DATA_DIR);
+ FileUtils.forceMkdir(TAR_DIR);
+ FileUtils.forceMkdir(DOWNLOAD_DIR);
+
+ // Initialize LocalPinotFS
+ PinotFSFactory.register(PinotFSFactory.LOCAL_PINOT_FS_SCHEME,
LocalPinotFS.class.getName(),
+ new PinotConfiguration());
+
+ // Setup fetcher config with retry settings
+ PinotConfiguration fetcherConfig = new PinotConfiguration();
+ fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY, 3);
+ fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_WAIT_MS_CONFIG_KEY, 10);
+
fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY,
1.1);
+
+ _segmentFetcher = new PinotFSSegmentFetcher();
+ _segmentFetcher.init(fetcherConfig);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws IOException {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+
+ /**
+ * Creates a tar.gz file containing a test segment directory with some test
data
+ */
+ private File createTestSegmentTar(String segmentName, String fileContent)
+ throws Exception {
+ // Create a segment directory with a test file
+ File segmentDir = new File(DATA_DIR, segmentName);
+ FileUtils.forceMkdir(segmentDir);
+
+ File dataFile = new File(segmentDir, "index");
+ FileUtils.write(dataFile, fileContent, Charset.defaultCharset());
+
+ // Create another file to simulate a real segment structure
+ File metadataFile = new File(segmentDir, "metadata.properties");
+ FileUtils.write(metadataFile, "segment.name=" + segmentName,
Charset.defaultCharset());
+
+ // Create tar.gz file
+ File tarFile = new File(TAR_DIR, segmentName + ".tar.gz");
+ TarCompressionUtils.createCompressedTarFile(segmentDir, tarFile);
+
+ return tarFile;
+ }
+
+ @Test
+ public void testFetchUntarSegmentToLocalStreamedSuccessFirstAttempt()
+ throws Exception {
+ // Create test segment tar
+ String testContent = "This is test segment data";
+ File segmentTar = createTestSegmentTar(SEGMENT_NAME, testContent);
+
+ // Create URI for the tar file
+ URI segmentUri = segmentTar.toURI();
+
+ // Download and untar
+ AtomicInteger failedAttempts = new AtomicInteger(0);
+ File untarredSegment = _segmentFetcher.fetchUntarSegmentToLocalStreamed(
+ segmentUri, DOWNLOAD_DIR, -1, failedAttempts);
+
+ // Verify
+ assertNotNull(untarredSegment, "Untarred segment should not be null");
+ assertTrue(untarredSegment.exists(), "Untarred segment directory should
exist");
+ assertTrue(untarredSegment.isDirectory(), "Untarred segment should be a
directory");
+ assertEquals(untarredSegment.getName(), SEGMENT_NAME, "Segment directory
name should match");
+ assertEquals(failedAttempts.get(), 0, "Should succeed on first attempt");
+
+ // Verify content
+ File indexFile = new File(untarredSegment, "index");
+ assertTrue(indexFile.exists(), "Index file should exist");
+ String actualContent = FileUtils.readFileToString(indexFile,
Charset.defaultCharset());
+ assertEquals(actualContent, testContent, "File content should match");
+
+ // Verify metadata file
+ File metadataFile = new File(untarredSegment, "metadata.properties");
+ assertTrue(metadataFile.exists(), "Metadata file should exist");
+ }
+
+ @Test
+ public void testFetchUntarSegmentToLocalStreamedWithRateLimit()
+ throws Exception {
+ // Create test segment tar
+ String testContent = "This is test segment data with rate limiting";
+ File segmentTar = createTestSegmentTar(SEGMENT_NAME + "_ratelimited",
testContent);
+
+ // Create URI for the tar file
+ URI segmentUri = segmentTar.toURI();
+
+ // Download and untar with rate limit (1 MB/s)
+ AtomicInteger failedAttempts = new AtomicInteger(0);
+ long rateLimit = 1024 * 1024; // 1 MB/s
+ File untarredSegment = _segmentFetcher.fetchUntarSegmentToLocalStreamed(
+ segmentUri, DOWNLOAD_DIR, rateLimit, failedAttempts);
+
+ // Verify
+ assertNotNull(untarredSegment, "Untarred segment should not be null");
+ assertTrue(untarredSegment.exists(), "Untarred segment directory should
exist");
+ assertEquals(failedAttempts.get(), 0, "Should succeed on first attempt");
+ }
+
+ @Test
+ public void testFetchUntarSegmentToLocalStreamedWithNullScheme()
+ throws Exception {
+ // Create test segment tar
+ String testContent = "Test content for null scheme";
+ File segmentTar = createTestSegmentTar(SEGMENT_NAME + "_nullscheme",
testContent);
+
+ // Create URI without scheme (tests the if (uri.getScheme() == null)
branch)
+ URI segmentUri = new URI(null, segmentTar.getAbsolutePath(), null);
+
+ // Download and untar
+ AtomicInteger failedAttempts = new AtomicInteger(0);
+ File untarredSegment = _segmentFetcher.fetchUntarSegmentToLocalStreamed(
+ segmentUri, DOWNLOAD_DIR, -1, failedAttempts);
+
+ // Verify
+ assertNotNull(untarredSegment, "Untarred segment should not be null");
+ assertTrue(untarredSegment.exists(), "Untarred segment directory should
exist");
+ assertEquals(failedAttempts.get(), 0, "Should succeed on first attempt");
+ }
+
+ @Test(expectedExceptions = AttemptsExceededException.class)
+ public void testFetchUntarSegmentToLocalStreamedFailureExceedsRetries()
+ throws Exception {
+ // Use a non-existent file to force failures
+ URI nonExistentUri = new File(TAR_DIR, "nonexistent.tar.gz").toURI();
+
+ AtomicInteger failedAttempts = new AtomicInteger(0);
+ try {
+ _segmentFetcher.fetchUntarSegmentToLocalStreamed(
+ nonExistentUri, DOWNLOAD_DIR, -1, failedAttempts);
+ } catch (AttemptsExceededException e) {
+ // Verify that all retries were exhausted
+ assertEquals(failedAttempts.get(), 3, "Should have 3 failed retries");
+ throw e;
+ }
+ }
+
+ @Test
+ public void testFetchUntarSegmentToLocalStreamedLargeFile()
+ throws Exception {
+ // Create a larger segment to test streaming behavior
+ String segmentName = SEGMENT_NAME + "_large";
+ File segmentDir = new File(DATA_DIR, segmentName);
+ FileUtils.forceMkdir(segmentDir);
+
+ // Create a file with more content (1KB of data)
+ StringBuilder largeContent = new StringBuilder();
+ for (int i = 0; i < 100; i++) {
+ largeContent.append("This is line ").append(i).append(" of test
data.\n");
+ }
+
+ File dataFile = new File(segmentDir, "largeIndex");
+ FileUtils.write(dataFile, largeContent.toString(),
Charset.defaultCharset());
+
+ // Create tar.gz file
+ File tarFile = new File(TAR_DIR, segmentName + ".tar.gz");
+ TarCompressionUtils.createCompressedTarFile(segmentDir, tarFile);
+
+ // Download and untar
+ URI segmentUri = tarFile.toURI();
+ AtomicInteger attempts = new AtomicInteger(0);
+ File untarredSegment = _segmentFetcher.fetchUntarSegmentToLocalStreamed(
+ segmentUri, DOWNLOAD_DIR, -1, attempts);
+
+ // Verify
+ assertNotNull(untarredSegment);
+ File largeIndexFile = new File(untarredSegment, "largeIndex");
+ assertTrue(largeIndexFile.exists());
+ String actualContent = FileUtils.readFileToString(largeIndexFile,
Charset.defaultCharset());
+ assertEquals(actualContent, largeContent.toString());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]