This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3fe25ba2e2 Make segment download from Peer servers more robust by retrying both peer discovery and download. (#12317) 3fe25ba2e2 is described below commit 3fe25ba2e2babe984ac94606eddb4ab6a8b47191 Author: Ting Chen <tingc...@uber.com> AuthorDate: Thu Feb 8 09:05:12 2024 -0800 Make segment download from Peer servers more robust by retrying both peer discovery and download. (#12317) * Retry peer discovery during peer segment download. * Move the segment finder to pinot-common to avoid circular dependencies * Add unit tests and refactor the codes. * Fix lint issues. * Fix long lines for lint issues * Fix lint issues * Fix lint issues * Fix lint issues * Fix compilation issues. * Fix lint. * Remove a redundant code branch. * Fix based on comments. * Remove unused imports. * Fix lints. * Fix lints. * Revise based on comments * Revise based on comments * Revise based on comments --- .../common/utils/fetcher/BaseSegmentFetcher.java | 35 +++++ .../common/utils/fetcher/HttpSegmentFetcher.java | 23 ++- .../pinot/common/utils/fetcher/SegmentFetcher.java | 9 ++ .../pinot/core/util/PeerServerSegmentFinder.java | 0 .../utils/fetcher/HttpSegmentFetcherTest.java | 171 +++++++++++++++++++++ .../utils/fetcher/SegmentFetcherFactoryTest.java | 8 +- .../core/data/manager/BaseTableDataManager.java | 10 -- .../manager/realtime/RealtimeTableDataManager.java | 15 +- 8 files changed, 253 insertions(+), 18 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java index d1756147d4..9deca98343 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.env.PinotConfiguration; @@ -109,6 +110,40 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher { throw new UnsupportedOperationException(); } + /** + * @param segmentName the name of the segment to fetch. + * @param uriSupplier the supplier to the list of segment download uris. + * @param dest The destination to put the downloaded segment. + * @throws Exception when the segment fetch fails after all attempts are exhausted or other runtime exceptions occur. + * This method keeps retrying (with exponential backoff) to go through the list download uris to fetch the segment + * until the retry limit is reached. + * + */ + @Override + public void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest) throws Exception { + try { + int attempt = + RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> { + List<URI> suppliedURIs = uriSupplier.get(); + // Go through the list of URIs to fetch the segment until success. + for (URI uri : suppliedURIs) { + try { + fetchSegmentToLocalWithoutRetry(uri, dest); + return true; + } catch (Exception e) { + _logger.warn("Download segment {} from peer {} failed.", segmentName, uri, e); + } + } + // None of the URI works. Return false for retry. + return false; + }); + _logger.info("Download segment {} successfully with {} attempts.", segmentName, attempt + 1); + } catch (Exception e) { + _logger.error("Failed to download segment {} after retries.", segmentName, e); + throw e; + } + } + /** * Fetches a segment from URI location to local without retry. Sub-class should override this or * {@link #fetchSegmentToLocal(URI, File)}. diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java index be73c1908b..2b986259b5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.utils.fetcher; +import com.google.common.annotations.VisibleForTesting; import com.google.common.net.InetAddresses; import java.io.File; import java.io.IOException; @@ -48,6 +49,20 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { _httpClient = new FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build()); } + public HttpSegmentFetcher() { + } + + @VisibleForTesting + protected HttpSegmentFetcher(FileUploadDownloadClient httpClient, PinotConfiguration config) { + _httpClient = httpClient; + _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT); + _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS); + _retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR); + _logger + .info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs, + _retryDelayScaleFactor); + } + @Override public void fetchSegmentToLocal(URI downloadURI, File dest) throws Exception { @@ -174,8 +189,12 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { throws Exception { try { int statusCode = _httpClient.downloadFile(uri, dest, _authProvider); - _logger.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(), - statusCode); + _logger.info("Try to download the segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, + dest.length(), statusCode); + // In case of download failure, throw exception. + if (statusCode >= 300) { + throw new HttpErrorStatusException("Failed to download segment", statusCode); + } } catch (Exception e) { _logger.warn("Caught exception while downloading segment from: {} to: {}", uri, dest, e); throw e; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java index 78d720751f..96961e4a08 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java @@ -22,6 +22,7 @@ import java.io.File; import java.net.URI; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.pinot.spi.env.PinotConfiguration; @@ -49,4 +50,12 @@ public interface SegmentFetcher { */ void fetchSegmentToLocal(List<URI> uri, File dest) throws Exception; + + /** + * @param segmentName the segment name to fetch. + * @param uriSupplier the supplier to the list of segment download uris. + * @param dest The destination to put the downloaded segment. + * @throws Exception when the segment fetch fails after all attempts are exhausted or other runtime exceptions occur. + */ + void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest) throws Exception; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java b/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java similarity index 100% rename from pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java rename to pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java new file mode 100644 index 0000000000..3159168dab --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.fetcher; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.core.util.PeerServerSegmentFinder; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.mockito.MockedStatic; +import org.testng.Assert; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + + +public class HttpSegmentFetcherTest { + private MockedStatic<PeerServerSegmentFinder> _peerServerSegmentFinder = mockStatic(PeerServerSegmentFinder.class); + private PinotConfiguration _fetcherConfig; + + @BeforeSuite + public void initTest() { + _fetcherConfig = new PinotConfiguration(); + _fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY, 3); + } + + @Test + public void testFetchSegmentToLocalSucceedAtFirstAttempt() + throws URISyntaxException, IOException, HttpErrorStatusException { + FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); + when(client.downloadFile(any(), any(), any())).thenReturn(200); + HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); + HelixManager helixManager = mock(HelixManager.class); + + List<URI> uris = new ArrayList<>(); + uris.add(new URI("http://h1:8080")); + uris.add(new URI("http://h2:8080")); + _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) + .thenReturn(uris); + try { + httpSegmentFetcher.fetchSegmentToLocal("seg", + () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); + } catch (Exception e) { + // If we reach here, the download fails. + Assert.assertTrue(false, "Download segment failed"); + Assert.assertTrue(e instanceof AttemptsExceededException); + } + _peerServerSegmentFinder.reset(); + } + + @Test + public void testFetchSegmentToLocalAllDownloadAttemptsFailed() + throws URISyntaxException, IOException, HttpErrorStatusException { + FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); + // All three attempts fails. + when(client.downloadFile(any(), any(), any())).thenReturn(300).thenReturn(300).thenReturn(300); + HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); + HelixManager helixManager = mock(HelixManager.class); + List<URI> uris = new ArrayList<>(); + uris.add(new URI("http://h1:8080")); + uris.add(new URI("http://h2:8080")); + + _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) + .thenReturn(uris); + try { + httpSegmentFetcher.fetchSegmentToLocal("seg", + () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); + // The test should not reach here because the fetch will throw exception. + Assert.assertTrue(false, "Download segment failed"); + } catch (Exception e) { + // If we reach here, the download fails. + Assert.assertTrue(true, "Download segment failed"); + } + } + + @Test + public void testFetchSegmentToLocalSuccessAfterRetry() + throws URISyntaxException, IOException, HttpErrorStatusException { + FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); + // the first two attempts failed until the last attempt succeeds + when(client.downloadFile(any(), any(), any())).thenReturn(300).thenReturn(300).thenReturn(200); + HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); + HelixManager helixManager = mock(HelixManager.class); + List<URI> uris = new ArrayList<>(); + uris.add(new URI("http://h1:8080")); + uris.add(new URI("http://h2:8080")); + + _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) + .thenReturn(uris); + try { + httpSegmentFetcher.fetchSegmentToLocal("seg", + () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); + } catch (Exception e) { + // If we reach here, the download fails. + Assert.assertTrue(false, "Download segment failed"); + } + } + + @Test + public void testFetchSegmentToLocalSuccessAfterFirstTwoAttemptsFoundNoPeerServers() + throws URISyntaxException, IOException, HttpErrorStatusException { + FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); + // The download always succeeds. + when(client.downloadFile(any(), any(), any())).thenReturn(200); + HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); + HelixManager helixManager = mock(HelixManager.class); + List<URI> uris = new ArrayList<>(); + uris.add(new URI("http://h1:8080")); + uris.add(new URI("http://h2:8080")); + + // The first two attempts find NO peers hosting the segment but the last one found two servers. + _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) + .thenReturn(List.of()).thenReturn(List.of()).thenReturn(uris); + try { + httpSegmentFetcher.fetchSegmentToLocal("seg", + () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); + } catch (Exception e) { + // If we reach here, the download fails. + Assert.assertTrue(false, "Download segment failed"); + } + } + + @Test + public void testFetchSegmentToLocalFailureWithNoPeerServers() + throws IOException, HttpErrorStatusException { + FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); + // the download always succeeds. + when(client.downloadFile(any(), any(), any())).thenReturn(200); + HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); + HelixManager helixManager = mock(HelixManager.class); + + _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) + .thenReturn(List.of()).thenReturn(List.of()).thenReturn(List.of()); + try { + httpSegmentFetcher.fetchSegmentToLocal("seg", + () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); + // The test should not reach here because the fetch will throw exception. + Assert.assertTrue(false, "Download segment failed"); + } catch (Exception e) { + Assert.assertTrue(true, "Download segment failed"); + Assert.assertTrue(e instanceof AttemptsExceededException); + } + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java index 389ccdc6d6..bc20cb84e1 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.pinot.spi.crypt.PinotCrypter; import org.apache.pinot.spi.crypt.PinotCrypterFactory; import org.apache.pinot.spi.env.PinotConfiguration; @@ -125,7 +126,12 @@ public class SegmentFetcherFactoryTest { } @Override - public void fetchSegmentToLocal(List<URI> uri, File dest) + public void fetchSegmentToLocal(List<URI> uri, File dest) { + throw new UnsupportedOperationException(); + } + + @Override + public void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest) throws Exception { throw new UnsupportedOperationException(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 191ea04a0d..bbad463d1d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -40,8 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.configuration2.Configuration; -import org.apache.commons.configuration2.ConfigurationConverter; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -75,7 +73,6 @@ import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.slf4j.Logger; @@ -954,11 +951,4 @@ public abstract class BaseTableDataManager implements TableDataManager { } } } - - private static PinotConfiguration toPinotConfiguration(Configuration configuration) { - if (configuration == null) { - return new PinotConfiguration(); - } - return new PinotConfiguration((Map<String, Object>) (Map) ConfigurationConverter.getMap(configuration)); - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 37e9d88f68..d974663b20 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; import java.net.URI; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -658,11 +659,15 @@ public class RealtimeTableDataManager extends BaseTableDataManager { try { tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - // First find servers hosting the segment in a ONLINE state. - List<URI> peerSegmentURIs = PeerServerSegmentFinder.getPeerServerURIs(segmentName, downloadScheme, _helixManager); - // Next download the segment from a randomly chosen server using configured scheme. - SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(peerSegmentURIs, segmentTarFile); - _logger.info("Fetched segment {} from: {} to: {} of size: {}", segmentName, peerSegmentURIs, segmentTarFile, + // Next download the segment from a randomly chosen server using configured download scheme (http or https). + SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(segmentName, + () -> { + List<URI> peerServerURIs = + PeerServerSegmentFinder.getPeerServerURIs(segmentName, downloadScheme, _helixManager); + Collections.shuffle(peerServerURIs); + return peerServerURIs; + }, segmentTarFile); + _logger.info("Fetched segment {} successfully to {} of size {}", segmentName, segmentTarFile, segmentTarFile.length()); untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); } catch (Exception e) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org