This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e4d24eb89a4 fix: respect tls spec for segment push client (#18530)
e4d24eb89a4 is described below
commit e4d24eb89a4a16f997f611154b56210c75f97a33
Author: Cc <[email protected]>
AuthorDate: Thu May 28 14:32:02 2026 +0800
fix: respect tls spec for segment push client (#18530)
Segment push used the shared default FileUploadDownloadClient, so job-level
TlsSpec key/trust store and timeout settings were ignored for controller upload
calls.
Create TLS-aware clients for segment push and consistent-data push
controller calls, avoid auto-renewal resources for short-lived clients, and
cover the public push paths with local mTLS tests.
Co-authored-by: wolfkill <[email protected]>
---
.../common/utils/FileUploadDownloadClient.java | 39 +-
.../apache/pinot/common/utils/tls/TlsUtils.java | 66 +++-
.../local/utils/ConsistentDataPushUtils.java | 178 +++++----
.../segment/local/utils/SegmentPushUtils.java | 404 ++++++++++++---------
.../segment/local/utils/SegmentPushUtilsTest.java | 300 +++++++++++++++
5 files changed, 721 insertions(+), 266 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 259652ad7c0..6acd94f097b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -894,8 +894,17 @@ public class FileUploadDownloadClient implements
AutoCloseable {
public Map<String, List<String>> getSegments(URI controllerBaseUri, String
rawTableName,
@Nullable TableType tableType, boolean excludeReplacedSegments,
@Nullable AuthProvider authProvider)
throws Exception {
+ return getSegments(controllerBaseUri, rawTableName, tableType,
excludeReplacedSegments, authProvider,
+ HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ }
+
+ /// Returns table segments using an explicit socket timeout for the
controller request.
+ public Map<String, List<String>> getSegments(URI controllerBaseUri, String
rawTableName,
+ @Nullable TableType tableType, boolean excludeReplacedSegments,
@Nullable AuthProvider authProvider,
+ int socketTimeoutMs)
+ throws Exception {
return getSegments(controllerBaseUri, rawTableName, tableType,
excludeReplacedSegments, Long.MIN_VALUE,
- Long.MAX_VALUE, false, authProvider);
+ Long.MAX_VALUE, false, authProvider, socketTimeoutMs);
}
/**
@@ -916,6 +925,15 @@ public class FileUploadDownloadClient implements
AutoCloseable {
@Nullable TableType tableType, boolean excludeReplacedSegments, long
startTimestamp, long endTimestamp,
boolean excludeOverlapping, @Nullable AuthProvider authProvider)
throws Exception {
+ return getSegments(controllerBaseUri, rawTableName, tableType,
excludeReplacedSegments, startTimestamp,
+ endTimestamp, excludeOverlapping, authProvider,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ }
+
+ /// Returns table segments using an explicit socket timeout for the
controller request.
+ public Map<String, List<String>> getSegments(URI controllerBaseUri, String
rawTableName,
+ @Nullable TableType tableType, boolean excludeReplacedSegments, long
startTimestamp, long endTimestamp,
+ boolean excludeOverlapping, @Nullable AuthProvider authProvider, int
socketTimeoutMs)
+ throws Exception {
List<String> tableTypes;
if (tableType == null) {
tableTypes = Arrays.asList(TableType.OFFLINE.toString(),
TableType.REALTIME.toString());
@@ -936,7 +954,7 @@ public class FileUploadDownloadClient implements
AutoCloseable {
RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0).attempt(()
-> {
try {
SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(
- _httpClient.sendRequest(requestBuilder.build(),
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS));
+ _httpClient.sendRequest(requestBuilder.build(),
socketTimeoutMs));
LOGGER.info("Response {}: {} received for GET request to URI: {}",
response.getStatusCode(),
response.getResponse(), uri);
tableTypeToSegments.put(tableTypeToFilter,
@@ -1150,9 +1168,16 @@ public class FileUploadDownloadClient implements
AutoCloseable {
public SimpleHttpResponse startReplaceSegments(URI uri,
StartReplaceSegmentsRequest startReplaceSegmentsRequest,
@Nullable AuthProvider authProvider)
throws IOException, HttpErrorStatusException {
+ return startReplaceSegments(uri, startReplaceSegmentsRequest,
authProvider, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ }
+
+ /// Starts the consistent-push replace protocol using an explicit socket
timeout.
+ public SimpleHttpResponse startReplaceSegments(URI uri,
StartReplaceSegmentsRequest startReplaceSegmentsRequest,
+ @Nullable AuthProvider authProvider, int socketTimeoutMs)
+ throws IOException, HttpErrorStatusException {
return HttpClient.wrapAndThrowHttpException(_httpClient.sendRequest(
getStartReplaceSegmentsRequest(uri,
JsonUtils.objectToString(startReplaceSegmentsRequest), authProvider),
- HttpClient.DEFAULT_SOCKET_TIMEOUT_MS));
+ socketTimeoutMs));
}
/**
@@ -1197,8 +1222,14 @@ public class FileUploadDownloadClient implements
AutoCloseable {
*/
public SimpleHttpResponse revertReplaceSegments(URI uri, @Nullable
AuthProvider authProvider)
throws IOException, HttpErrorStatusException {
+ return revertReplaceSegments(uri, authProvider,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ }
+
+ /// Reverts the consistent-push replace protocol using an explicit socket
timeout.
+ public SimpleHttpResponse revertReplaceSegments(URI uri, @Nullable
AuthProvider authProvider, int socketTimeoutMs)
+ throws IOException, HttpErrorStatusException {
return HttpClient.wrapAndThrowHttpException(_httpClient.sendRequest(
- getRevertReplaceSegmentRequest(uri, authProvider)));
+ getRevertReplaceSegmentRequest(uri, authProvider), socketTimeoutMs));
}
/**
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/tls/TlsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/tls/TlsUtils.java
index 74cf0023dd1..ec25628f51c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/tls/TlsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/tls/TlsUtils.java
@@ -36,6 +36,7 @@ import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
@@ -250,30 +251,59 @@ public final class TlsUtils {
*/
public static void installDefaultSSLSocketFactory(String keyStoreType,
String keyStorePath, String keyStorePassword,
String trustStoreType, String trustStorePath, String trustStorePassword)
{
+ SSLContext sc = createSslContext(keyStoreType, keyStorePath,
keyStorePassword, trustStoreType, trustStorePath,
+ trustStorePassword);
+ // HttpsURLConnection
+ HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+ setSslContext(sc);
+ logTlsDiagnosticsOnce("https.default", sc, null, false);
+ }
+
+ /// Creates a client-side SSL context from key/trust store settings without
mutating JVM defaults.
+ ///
+ /// If both store paths are null, the returned context uses default key and
trust managers. If either store path is
+ /// provided, its type and password must also be provided. File-backed
stores are auto-renewed.
+ public static SSLContext createSslContext(@Nullable String keyStoreType,
@Nullable String keyStorePath,
+ @Nullable String keyStorePassword, @Nullable String trustStoreType,
@Nullable String trustStorePath,
+ @Nullable String trustStorePassword) {
+ return createSslContext(keyStoreType, keyStorePath, keyStorePassword,
trustStoreType, trustStorePath,
+ trustStorePassword, true);
+ }
+
+ /// Creates a client-side SSL context without enabling file-store
auto-renewal.
+ ///
+ /// This is intended for short-lived clients whose owners can close the HTTP
client but do not own the renewal
+ /// executors/watch services created by auto-renewal.
+ public static SSLContext createSslContextWithoutAutoRenewal(@Nullable String
keyStoreType,
+ @Nullable String keyStorePath, @Nullable String keyStorePassword,
@Nullable String trustStoreType,
+ @Nullable String trustStorePath, @Nullable String trustStorePassword) {
+ return createSslContext(keyStoreType, keyStorePath, keyStorePassword,
trustStoreType, trustStorePath,
+ trustStorePassword, false);
+ }
+
+ private static SSLContext createSslContext(@Nullable String keyStoreType,
@Nullable String keyStorePath,
+ @Nullable String keyStorePassword, @Nullable String trustStoreType,
@Nullable String trustStorePath,
+ @Nullable String trustStorePassword, boolean enableAutoRenewal) {
try {
SecureRandom secureRandom = new SecureRandom();
- SSLContext sc;
if (keyStorePath == null && trustStorePath == null) {
// When neither keyStorePath nor trustStorePath is provided, a
SSLFactory cannot be created. create SSLContext
// directly and use the default key manager and trust manager.
- sc = SSLContext.getInstance(SSL_CONTEXT_PROTOCOL);
- sc.init(null, null, secureRandom);
- } else {
- SSLFactory sslFactory =
- RenewableTlsUtils.createSSLFactory(keyStoreType, keyStorePath,
keyStorePassword, trustStoreType,
- trustStorePath, trustStorePassword, SSL_CONTEXT_PROTOCOL,
secureRandom, true, false);
- if (isKeyOrTrustStorePathNullOrHasFileScheme(keyStorePath) &&
isKeyOrTrustStorePathNullOrHasFileScheme(
- trustStorePath)) {
-
RenewableTlsUtils.enableAutoRenewalFromFileStoreForSSLFactory(sslFactory,
keyStoreType, keyStorePath,
- keyStorePassword, trustStoreType, trustStorePath,
trustStorePassword, SSL_CONTEXT_PROTOCOL, secureRandom,
- PinotInsecureMode::isPinotInInsecureMode);
- }
- sc = sslFactory.getSslContext();
+ SSLContext sslContext = SSLContext.getInstance(SSL_CONTEXT_PROTOCOL);
+ sslContext.init(null, null, secureRandom);
+ return sslContext;
+ }
+
+ SSLFactory sslFactory =
+ RenewableTlsUtils.createSSLFactory(keyStoreType, keyStorePath,
keyStorePassword, trustStoreType,
+ trustStorePath, trustStorePassword, SSL_CONTEXT_PROTOCOL,
secureRandom, true, false);
+ if (enableAutoRenewal &&
isKeyOrTrustStorePathNullOrHasFileScheme(keyStorePath)
+ && isKeyOrTrustStorePathNullOrHasFileScheme(trustStorePath)) {
+
RenewableTlsUtils.enableAutoRenewalFromFileStoreForSSLFactory(sslFactory,
keyStoreType, keyStorePath,
+ keyStorePassword, trustStoreType, trustStorePath,
trustStorePassword, SSL_CONTEXT_PROTOCOL, secureRandom,
+ PinotInsecureMode::isPinotInInsecureMode);
}
- // HttpsURLConnection
- HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
- setSslContext(sc);
- logTlsDiagnosticsOnce("https.default", sc, null, false);
+ return sslFactory.getSslContext();
} catch (GenericSSLContextException | GeneralSecurityException e) {
throw new IllegalStateException("Could not initialize SSL support", e);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
index 8d1cc9bde58..23ad163c294 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
@@ -31,7 +31,6 @@ import
org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -51,7 +50,6 @@ public class ConsistentDataPushUtils {
}
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsistentDataPushUtils.class);
- private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT =
new FileUploadDownloadClient();
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0);
public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
@@ -113,42 +111,48 @@ public class ConsistentDataPushUtils {
Map<URI, URI> segmentsUris = getStartReplaceSegmentUris(spec,
rawTableName);
AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
LOGGER.info("Start replace segment URIs: {}", segmentsUris);
+ FileUploadDownloadClient fileUploadDownloadClient =
SegmentPushUtils.getOrCreateFileUploadDownloadClient(spec);
+ int socketTimeoutMs = SegmentPushUtils.getSocketTimeoutMs(spec);
- for (Map.Entry<URI, URI> entry : segmentsUris.entrySet()) {
- URI controllerUri = entry.getKey();
- URI startSegmentUri = entry.getValue();
- List<String> segmentsFrom = uriToSegmentsFrom.get(controllerUri);
+ try {
+ for (Map.Entry<URI, URI> entry : segmentsUris.entrySet()) {
+ URI controllerUri = entry.getKey();
+ URI startSegmentUri = entry.getValue();
+ List<String> segmentsFrom = uriToSegmentsFrom.get(controllerUri);
- StartReplaceSegmentsRequest startReplaceSegmentsRequest =
- new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo);
- DEFAULT_RETRY_POLICY.attempt(() -> {
- try {
- SimpleHttpResponse response =
-
FILE_UPLOAD_DOWNLOAD_CLIENT.startReplaceSegments(startSegmentUri,
startReplaceSegmentsRequest,
- authProvider);
- String responseString = response.getResponse();
- LOGGER.info(
- "Got response {}: {} while sending start replace segment request
for table: {}, uploadURI: {}, request:"
- + " {}", response.getStatusCode(), responseString,
rawTableName, startSegmentUri,
- startReplaceSegmentsRequest);
- String segmentLineageEntryId =
-
JsonUtils.stringToJsonNode(responseString).get("segmentLineageEntryId").asText();
- uriToLineageEntryIdMap.put(controllerUri, segmentLineageEntryId);
- return true;
- } catch (SocketTimeoutException se) {
- // In case of the timeout, we should re-try.
- return false;
- } catch (HttpErrorStatusException e) {
- if (e.getStatusCode() >= 500) {
+ StartReplaceSegmentsRequest startReplaceSegmentsRequest =
+ new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo);
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ try {
+ SimpleHttpResponse response =
+ fileUploadDownloadClient.startReplaceSegments(startSegmentUri,
startReplaceSegmentsRequest,
+ authProvider, socketTimeoutMs);
+ String responseString = response.getResponse();
+ LOGGER.info(
+ "Got response {}: {} while sending start replace segment
request for table: {}, uploadURI: {}, request:"
+ + " {}", response.getStatusCode(), responseString,
rawTableName, startSegmentUri,
+ startReplaceSegmentsRequest);
+ String segmentLineageEntryId =
+
JsonUtils.stringToJsonNode(responseString).get("segmentLineageEntryId").asText();
+ uriToLineageEntryIdMap.put(controllerUri, segmentLineageEntryId);
+ return true;
+ } catch (SocketTimeoutException se) {
+ // In case of the timeout, we should re-try.
return false;
- } else {
- if (e.getStatusCode() ==
Response.Status.NOT_FOUND.getStatusCode()) {
- LOGGER.error("Table: {} not found when sending request: {}",
rawTableName, startSegmentUri);
+ } catch (HttpErrorStatusException e) {
+ if (e.getStatusCode() >= 500) {
+ return false;
+ } else {
+ if (e.getStatusCode() ==
Response.Status.NOT_FOUND.getStatusCode()) {
+ LOGGER.error("Table: {} not found when sending request: {}",
rawTableName, startSegmentUri);
+ }
+ throw e;
}
- throw e;
}
- }
- });
+ });
+ }
+ } finally {
+ SegmentPushUtils.closeFileUploadDownloadClient(spec,
fileUploadDownloadClient);
}
return uriToLineageEntryIdMap;
}
@@ -160,30 +164,35 @@ public class ConsistentDataPushUtils {
throws Exception {
AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
String rawTableName = spec.getTableSpec().getTableName();
- for (URI controllerUri : uriToLineageEntryIdMap.keySet()) {
- String segmentLineageEntryId = uriToLineageEntryIdMap.get(controllerUri);
- URI uri =
- FileUploadDownloadClient.getEndReplaceSegmentsURI(controllerUri,
rawTableName, TableType.OFFLINE.toString(),
- segmentLineageEntryId, false);
- DEFAULT_RETRY_POLICY.attempt(() -> {
- try {
- SimpleHttpResponse response =
- FILE_UPLOAD_DOWNLOAD_CLIENT.endReplaceSegments(uri,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS,
- null, authProvider);
- LOGGER.info("Got response {}: {} while sending end replace segment
request for table: {}, uploadURI: {}",
- response.getStatusCode(), response.getResponse(), rawTableName,
uri);
- return true;
- } catch (SocketTimeoutException se) {
- // In case of the timeout, we should re-try.
- return false;
- } catch (HttpErrorStatusException e) {
- if (e.getStatusCode() >= 500) {
+ FileUploadDownloadClient fileUploadDownloadClient =
SegmentPushUtils.getOrCreateFileUploadDownloadClient(spec);
+ int socketTimeoutMs = SegmentPushUtils.getSocketTimeoutMs(spec);
+ try {
+ for (URI controllerUri : uriToLineageEntryIdMap.keySet()) {
+ String segmentLineageEntryId =
uriToLineageEntryIdMap.get(controllerUri);
+ URI uri =
+ FileUploadDownloadClient.getEndReplaceSegmentsURI(controllerUri,
rawTableName, TableType.OFFLINE.toString(),
+ segmentLineageEntryId, false);
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ try {
+ SimpleHttpResponse response =
+ fileUploadDownloadClient.endReplaceSegments(uri,
socketTimeoutMs, null, authProvider);
+ LOGGER.info("Got response {}: {} while sending end replace segment
request for table: {}, uploadURI: {}",
+ response.getStatusCode(), response.getResponse(),
rawTableName, uri);
+ return true;
+ } catch (SocketTimeoutException se) {
+ // In case of the timeout, we should re-try.
return false;
- } else {
- throw e;
+ } catch (HttpErrorStatusException e) {
+ if (e.getStatusCode() >= 500) {
+ return false;
+ } else {
+ throw e;
+ }
}
- }
- });
+ });
+ }
+ } finally {
+ SegmentPushUtils.closeFileUploadDownloadClient(spec,
fileUploadDownloadClient);
}
}
@@ -198,18 +207,25 @@ public class ConsistentDataPushUtils {
LOGGER.error("Exception when pushing segments. Marking segment lineage
entry to 'REVERTED'.", exception);
String rawTableName = spec.getTableSpec().getTableName();
AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
- for (Map.Entry<URI, String> entry : uriToLineageEntryIdMap.entrySet()) {
- String segmentLineageEntryId = entry.getValue();
- try {
- URI uri =
FileUploadDownloadClient.getRevertReplaceSegmentsURI(entry.getKey(),
rawTableName,
- TableType.OFFLINE.name(), segmentLineageEntryId, true);
- SimpleHttpResponse response =
FILE_UPLOAD_DOWNLOAD_CLIENT.revertReplaceSegments(uri, authProvider);
- LOGGER.info("Got response {}: {} while sending revert replace
segment request for table: {}, uploadURI: {}",
- response.getStatusCode(), response.getResponse(), rawTableName,
entry.getKey());
- } catch (URISyntaxException | HttpErrorStatusException | IOException
e) {
- LOGGER.error("Exception when sending revert replace segment request
to controller: {} for table: {}",
- entry.getKey(), rawTableName, e);
+ FileUploadDownloadClient fileUploadDownloadClient =
SegmentPushUtils.getOrCreateFileUploadDownloadClient(spec);
+ int socketTimeoutMs = SegmentPushUtils.getSocketTimeoutMs(spec);
+ try {
+ for (Map.Entry<URI, String> entry : uriToLineageEntryIdMap.entrySet())
{
+ String segmentLineageEntryId = entry.getValue();
+ try {
+ URI uri =
FileUploadDownloadClient.getRevertReplaceSegmentsURI(entry.getKey(),
rawTableName,
+ TableType.OFFLINE.name(), segmentLineageEntryId, true);
+ SimpleHttpResponse response =
+ fileUploadDownloadClient.revertReplaceSegments(uri,
authProvider, socketTimeoutMs);
+ LOGGER.info("Got response {}: {} while sending revert replace
segment request for table: {}, uploadURI: {}",
+ response.getStatusCode(), response.getResponse(),
rawTableName, entry.getKey());
+ } catch (URISyntaxException | HttpErrorStatusException | IOException
e) {
+ LOGGER.error("Exception when sending revert replace segment
request to controller: {} for table: {}",
+ entry.getKey(), rawTableName, e);
+ }
}
+ } finally {
+ SegmentPushUtils.closeFileUploadDownloadClient(spec,
fileUploadDownloadClient);
}
}
}
@@ -231,19 +247,25 @@ public class ConsistentDataPushUtils {
public static Map<URI, List<String>>
getSegmentsToReplace(SegmentGenerationJobSpec spec, String rawTableName)
throws Exception {
Map<URI, List<String>> uriToOfflineSegments = new HashMap<>();
- for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
- URI controllerURI;
- List<String> offlineSegments;
- try {
- controllerURI = new URI(pinotClusterSpec.getControllerURI());
- AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
- Map<String, List<String>> segments =
- FILE_UPLOAD_DOWNLOAD_CLIENT.getSegments(controllerURI,
rawTableName, TableType.OFFLINE, true, authProvider);
- offlineSegments = segments.get(TableType.OFFLINE.toString());
- uriToOfflineSegments.put(controllerURI, offlineSegments);
- } catch (URISyntaxException e) {
- throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
+ FileUploadDownloadClient fileUploadDownloadClient =
SegmentPushUtils.getOrCreateFileUploadDownloadClient(spec);
+ int socketTimeoutMs = SegmentPushUtils.getSocketTimeoutMs(spec);
+ try {
+ for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+ URI controllerURI;
+ List<String> offlineSegments;
+ try {
+ controllerURI = new URI(pinotClusterSpec.getControllerURI());
+ AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
+ Map<String, List<String>> segments =
fileUploadDownloadClient.getSegments(controllerURI, rawTableName,
+ TableType.OFFLINE, true, authProvider, socketTimeoutMs);
+ offlineSegments = segments.get(TableType.OFFLINE.toString());
+ uriToOfflineSegments.put(controllerURI, offlineSegments);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
+ }
}
+ } finally {
+ SegmentPushUtils.closeFileUploadDownloadClient(spec,
fileUploadDownloadClient);
}
return uriToOfflineSegments;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index d4e39a11a93..aa4655e6627 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -46,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
@@ -53,11 +54,14 @@ import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.common.utils.http.HttpClientConfig;
+import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.segment.local.constants.SegmentUploadConstants;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -65,6 +69,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TlsSpec;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -78,6 +83,43 @@ public class SegmentPushUtils implements Serializable {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentPushUtils.class);
private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT =
new FileUploadDownloadClient();
+ private static final String HTTP_CLIENT_CONNECTION_TIMEOUT_CONFIG =
"http.client.connectionTimeoutMs";
+
+ @VisibleForTesting
+ static FileUploadDownloadClient
getOrCreateFileUploadDownloadClient(SegmentGenerationJobSpec spec) {
+ TlsSpec tlsSpec = spec.getTlsSpec();
+ if (tlsSpec == null) {
+ return FILE_UPLOAD_DOWNLOAD_CLIENT;
+ }
+ return new FileUploadDownloadClient(getHttpClientConfig(tlsSpec),
+ TlsUtils.createSslContextWithoutAutoRenewal(tlsSpec.getKeyStoreType(),
tlsSpec.getKeyStorePath(),
+ tlsSpec.getKeyStorePassword(), tlsSpec.getTrustStoreType(),
tlsSpec.getTrustStorePath(),
+ tlsSpec.getTrustStorePassword()));
+ }
+
+ @VisibleForTesting
+ static HttpClientConfig getHttpClientConfig(TlsSpec tlsSpec) {
+ PinotConfiguration httpClientConfiguration = new PinotConfiguration();
+ httpClientConfiguration.setProperty(HTTP_CLIENT_CONNECTION_TIMEOUT_CONFIG,
tlsSpec.getConnectTimeout());
+ return HttpClientConfig.newBuilder(httpClientConfiguration).build();
+ }
+
+ static void closeFileUploadDownloadClient(SegmentGenerationJobSpec spec,
+ FileUploadDownloadClient fileUploadDownloadClient) {
+ if (spec.getTlsSpec() == null) {
+ return;
+ }
+ try {
+ fileUploadDownloadClient.close();
+ } catch (IOException e) {
+ LOGGER.warn("Unable to close TLS-aware file upload/download client", e);
+ }
+ }
+
+ static int getSocketTimeoutMs(SegmentGenerationJobSpec spec) {
+ TlsSpec tlsSpec = spec.getTlsSpec();
+ return tlsSpec != null ? tlsSpec.getReadTimeout() :
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS;
+ }
public static URI generateSegmentTarURI(URI dirURI, URI fileURI, String
prefix, String suffix) {
if (StringUtils.isEmpty(prefix) && StringUtils.isEmpty(suffix)) {
@@ -164,57 +206,63 @@ public class SegmentPushUtils implements Serializable {
LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
Arrays.toString(tarFilePaths.subList(0, Math.min(5,
tarFilePaths.size())).toArray()),
Arrays.toString(spec.getPinotClusterSpecs()), tableName);
- for (String tarFilePath : tarFilePaths) {
- URI tarFileURI = URI.create(tarFilePath);
- File tarFile = new File(tarFilePath);
- String fileName = tarFile.getName();
-
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
- String segmentName = fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length());
- for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
- URI controllerURI;
- try {
- controllerURI = new URI(pinotClusterSpec.getControllerURI());
- } catch (URISyntaxException e) {
- throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
- }
- LOGGER.info("Pushing segment: {} to location: {} for table {}",
segmentName, controllerURI, tableName);
- int attempts = 1;
- if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushAttempts() > 0) {
- attempts = spec.getPushJobSpec().getPushAttempts();
- }
- long retryWaitMs = 1000L;
- if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
- retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
- }
- RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
- try (InputStream inputStream = fileSystem.open(tarFileURI)) {
- SimpleHttpResponse response =
-
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
- segmentName, inputStream, headers,
- parameters, tableName, tableType);
- LOGGER.info("Response for pushing table {} segment {} to location
{} - {}: {}", tableName, segmentName,
- controllerURI, response.getStatusCode(),
response.getResponse());
- return true;
- } catch (HttpErrorStatusException e) {
- int statusCode = e.getStatusCode();
- if (statusCode >= 500) {
- // Temporary exception
- LOGGER.warn("Caught temporary exception while pushing table: {}
segment: {} to {}, will retry", tableName,
- segmentName, controllerURI, e);
- return false;
- } else {
- // Permanent exception
- LOGGER.error("Caught permanent exception while pushing table: {}
segment: {} to {}, won't retry",
- tableName, segmentName, controllerURI, e);
- throw e;
- }
- } finally {
- if (cleanUpOutputDir) {
- fileSystem.delete(tarFileURI, true);
- }
+ FileUploadDownloadClient fileUploadDownloadClient =
getOrCreateFileUploadDownloadClient(spec);
+ int socketTimeoutMs = getSocketTimeoutMs(spec);
+ try {
+ for (String tarFilePath : tarFilePaths) {
+ URI tarFileURI = URI.create(tarFilePath);
+ File tarFile = new File(tarFilePath);
+ String fileName = tarFile.getName();
+
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+ String segmentName = fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length());
+ for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+ URI controllerURI;
+ try {
+ controllerURI = new URI(pinotClusterSpec.getControllerURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
}
- });
+ LOGGER.info("Pushing segment: {} to location: {} for table {}",
segmentName, controllerURI, tableName);
+ int attempts = 1;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushAttempts() > 0) {
+ attempts = spec.getPushJobSpec().getPushAttempts();
+ }
+ long retryWaitMs = 1000L;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+ retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+ }
+ RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
+ try (InputStream inputStream = fileSystem.open(tarFileURI)) {
+ SimpleHttpResponse response =
+
fileUploadDownloadClient.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
+ segmentName, inputStream, headers,
makeUploadSegmentParams(parameters, tableName, tableType),
+ socketTimeoutMs);
+ LOGGER.info("Response for pushing table {} segment {} to
location {} - {}: {}", tableName, segmentName,
+ controllerURI, response.getStatusCode(),
response.getResponse());
+ return true;
+ } catch (HttpErrorStatusException e) {
+ int statusCode = e.getStatusCode();
+ if (statusCode >= 500) {
+ // Temporary exception
+ LOGGER.warn("Caught temporary exception while pushing table:
{} segment: {} to {}, will retry",
+ tableName, segmentName, controllerURI, e);
+ return false;
+ } else {
+ // Permanent exception
+ LOGGER.error("Caught permanent exception while pushing table:
{} segment: {} to {}, won't retry",
+ tableName, segmentName, controllerURI, e);
+ throw e;
+ }
+ } finally {
+ if (cleanUpOutputDir) {
+ fileSystem.delete(tarFileURI, true);
+ }
+ }
+ });
+ }
}
+ } finally {
+ closeFileUploadDownloadClient(spec, fileUploadDownloadClient);
}
}
@@ -225,53 +273,59 @@ public class SegmentPushUtils implements Serializable {
LOGGER.info("Start sending table {} segment URIs: {} to locations: {}",
tableName,
Arrays.toString(segmentUris.subList(0, Math.min(5,
segmentUris.size())).toArray()),
Arrays.toString(spec.getPinotClusterSpecs()));
- for (String segmentUri : segmentUris) {
- URI segmentURI = URI.create(segmentUri);
- PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme());
- for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
- URI controllerURI;
- try {
- controllerURI = new URI(pinotClusterSpec.getControllerURI());
- } catch (URISyntaxException e) {
- throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
- }
- LOGGER.info("Sending table {} segment URI: {} to location: {} for ",
tableName, segmentUri, controllerURI);
- int attempts = 1;
- if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushAttempts() > 0) {
- attempts = spec.getPushJobSpec().getPushAttempts();
- }
- long retryWaitMs = 1000L;
- if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
- retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
- }
- RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
+ FileUploadDownloadClient fileUploadDownloadClient =
getOrCreateFileUploadDownloadClient(spec);
+ int socketTimeoutMs = getSocketTimeoutMs(spec);
+ try {
+ for (String segmentUri : segmentUris) {
+ URI segmentURI = URI.create(segmentUri);
+ PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme());
+ for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+ URI controllerURI;
try {
- SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-
.sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
segmentUri,
- headers, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
- LOGGER.info("Response for pushing table {} segment uri {} to
location {} - {}: {}", tableName, segmentUri,
- controllerURI, response.getStatusCode(),
response.getResponse());
- return true;
- } catch (HttpErrorStatusException e) {
- int statusCode = e.getStatusCode();
- if (statusCode >= 500) {
- // Temporary exception
- LOGGER.warn("Caught temporary exception while pushing table: {}
segment uri: {} to {}, will retry",
- tableName, segmentUri, controllerURI, e);
- return false;
- } else {
- // Permanent exception
- LOGGER.error("Caught permanent exception while pushing table: {}
segment uri: {} to {}, won't retry",
- tableName, segmentUri, controllerURI, e);
- throw e;
- }
- } finally {
- if (spec.isCleanUpOutputDir()) {
- outputDirFS.delete(segmentURI, true);
- }
+ controllerURI = new URI(pinotClusterSpec.getControllerURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
}
- });
+ LOGGER.info("Sending table {} segment URI: {} to location: {} for ",
tableName, segmentUri, controllerURI);
+ int attempts = 1;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushAttempts() > 0) {
+ attempts = spec.getPushJobSpec().getPushAttempts();
+ }
+ long retryWaitMs = 1000L;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+ retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+ }
+ RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
+ try {
+ SimpleHttpResponse response = fileUploadDownloadClient
+
.sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
segmentUri,
+ headers, parameters, socketTimeoutMs);
+ LOGGER.info("Response for pushing table {} segment uri {} to
location {} - {}: {}", tableName,
+ segmentUri, controllerURI, response.getStatusCode(),
response.getResponse());
+ return true;
+ } catch (HttpErrorStatusException e) {
+ int statusCode = e.getStatusCode();
+ if (statusCode >= 500) {
+ // Temporary exception
+ LOGGER.warn("Caught temporary exception while pushing table:
{} segment uri: {} to {}, will retry",
+ tableName, segmentUri, controllerURI, e);
+ return false;
+ } else {
+ // Permanent exception
+ LOGGER.error("Caught permanent exception while pushing table:
{} segment uri: {} to {}, won't retry",
+ tableName, segmentUri, controllerURI, e);
+ throw e;
+ }
+ } finally {
+ if (spec.isCleanUpOutputDir()) {
+ outputDirFS.delete(segmentURI, true);
+ }
+ }
+ });
+ }
}
+ } finally {
+ closeFileUploadDownloadClient(spec, fileUploadDownloadClient);
}
}
@@ -295,83 +349,90 @@ public class SegmentPushUtils implements Serializable {
String tableName = spec.getTableSpec().getTableName();
LOGGER.info("Start pushing segment metadata: {} to locations: {} for table
{}", segmentUriToTarPathMap,
Arrays.toString(spec.getPinotClusterSpecs()), tableName);
- for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
- String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
- String fileName = new File(tarFilePath).getName();
- // segments stored in Pinot deep store do not have .tar.gz extension
- String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
- ? fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
- SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
- File segmentMetadataFile;
- // Check if there is a segment metadata tar gz file named
`segmentName.metadata.tar.gz`, already in the remote
- // directory. This is to avoid generating a new segment metadata tar gz
file every time we push a segment,
- // which requires downloading the entire segment tar gz file.
-
- URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath,
segmentName);
- LOGGER.info("Checking if metadata tar gz file {} exists",
metadataTarGzFilePath);
- if (spec.getPushJobSpec().isPreferMetadataTarGz() &&
fileSystem.exists(metadataTarGzFilePath)) {
- segmentMetadataFile = new File(FileUtils.getTempDirectory(),
- "segmentMetadata-" + UUID.randomUUID() +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
- if (segmentMetadataFile.exists()) {
- FileUtils.forceDelete(segmentMetadataFile);
- }
- fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
- } else {
- segmentMetadataFile = generateSegmentMetadataFile(fileSystem,
URI.create(tarFilePath));
- }
- try {
- for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
- URI controllerURI;
- try {
- controllerURI = new URI(pinotClusterSpec.getControllerURI());
- } catch (URISyntaxException e) {
- throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
- }
- LOGGER.info("Pushing segment: {} to location: {} for table {}",
segmentName, controllerURI, tableName);
- int attempts = 1;
- if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushAttempts() > 0) {
- attempts = spec.getPushJobSpec().getPushAttempts();
- }
- long retryWaitMs = 1000L;
- if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
- retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+ FileUploadDownloadClient fileUploadDownloadClient =
getOrCreateFileUploadDownloadClient(spec);
+ int socketTimeoutMs = getSocketTimeoutMs(spec);
+ try {
+ for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+ String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+ String fileName = new File(tarFilePath).getName();
+ // segments stored in Pinot deep store do not have .tar.gz extension
+ String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+ ? fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+ SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+ File segmentMetadataFile;
+ // Check if there is a segment metadata tar gz file named
`segmentName.metadata.tar.gz`, already in the remote
+ // directory. This is to avoid generating a new segment metadata tar
gz file every time we push a segment,
+ // which requires downloading the entire segment tar gz file.
+
+ URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath,
segmentName);
+ LOGGER.info("Checking if metadata tar gz file {} exists",
metadataTarGzFilePath);
+ if (spec.getPushJobSpec().isPreferMetadataTarGz() &&
fileSystem.exists(metadataTarGzFilePath)) {
+ segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+ "segmentMetadata-" + UUID.randomUUID() +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ if (segmentMetadataFile.exists()) {
+ FileUtils.forceDelete(segmentMetadataFile);
}
- RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
- List<Header> reqHttpHeaders = new ArrayList<>(headers);
+ fileSystem.copyToLocalFile(metadataTarGzFilePath,
segmentMetadataFile);
+ } else {
+ segmentMetadataFile = generateSegmentMetadataFile(fileSystem,
URI.create(tarFilePath));
+ }
+ try {
+ for (PinotClusterSpec pinotClusterSpec :
spec.getPinotClusterSpecs()) {
+ URI controllerURI;
try {
- reqHttpHeaders.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
segmentUriPath));
- reqHttpHeaders.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
-
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
- if (spec.getPushJobSpec() != null) {
- reqHttpHeaders.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
-
String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
- }
+ controllerURI = new URI(pinotClusterSpec.getControllerURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
+ }
+ LOGGER.info("Pushing segment: {} to location: {} for table {}",
segmentName, controllerURI, tableName);
+ int attempts = 1;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushAttempts() > 0) {
+ attempts = spec.getPushJobSpec().getPushAttempts();
+ }
+ long retryWaitMs = 1000L;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+ retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+ }
+ RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
+ List<Header> reqHttpHeaders = new ArrayList<>(headers);
+ try {
+ reqHttpHeaders.add(
+ new
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
segmentUriPath));
+ reqHttpHeaders.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+ if (spec.getPushJobSpec() != null) {
+ reqHttpHeaders.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
+
String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
+ }
- SimpleHttpResponse response =
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(
- FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
segmentName,
- segmentMetadataFile, reqHttpHeaders, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
- LOGGER.info("Response for pushing table {} segment {} to
location {} - {}: {}", tableName, segmentName,
- controllerURI, response.getStatusCode(),
response.getResponse());
- return true;
- } catch (HttpErrorStatusException e) {
- int statusCode = e.getStatusCode();
- if (statusCode >= 500) {
- // Temporary exception
- LOGGER.warn("Caught temporary exception while pushing table:
{} segment: {} to {}, will retry",
- tableName, segmentName, controllerURI, e);
- return false;
- } else {
- // Permanent exception
- LOGGER.error("Caught permanent exception while pushing table:
{} segment: {} to {}, won't retry",
- tableName, segmentName, controllerURI, e);
- throw e;
+ SimpleHttpResponse response =
fileUploadDownloadClient.uploadSegmentMetadata(
+
FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName,
+ segmentMetadataFile, reqHttpHeaders, parameters,
socketTimeoutMs);
+ LOGGER.info("Response for pushing table {} segment {} to
location {} - {}: {}", tableName, segmentName,
+ controllerURI, response.getStatusCode(),
response.getResponse());
+ return true;
+ } catch (HttpErrorStatusException e) {
+ int statusCode = e.getStatusCode();
+ if (statusCode >= 500) {
+ // Temporary exception
+ LOGGER.warn("Caught temporary exception while pushing table:
{} segment: {} to {}, will retry",
+ tableName, segmentName, controllerURI, e);
+ return false;
+ } else {
+ // Permanent exception
+ LOGGER.error("Caught permanent exception while pushing
table: {} segment: {} to {}, won't retry",
+ tableName, segmentName, controllerURI, e);
+ throw e;
+ }
}
- }
- });
+ });
+ }
+ } finally {
+ FileUtils.deleteQuietly(segmentMetadataFile);
}
- } finally {
- FileUtils.deleteQuietly(segmentMetadataFile);
}
+ } finally {
+ closeFileUploadDownloadClient(spec, fileUploadDownloadClient);
}
}
@@ -384,6 +445,8 @@ public class SegmentPushUtils implements Serializable {
Map<String, File> allSegmentsMetadataMap = new HashMap<>();
File allSegmentsMetadataTarFile = null;
int nThreads =
spec.getPushJobSpec().getSegmentMetadataGenerationParallelism();
+ FileUploadDownloadClient fileUploadDownloadClient =
getOrCreateFileUploadDownloadClient(spec);
+ int socketTimeoutMs = getSocketTimeoutMs(spec);
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
LOGGER.info("Start pushing segment metadata: {} to locations: {} for
table: {} with parallelism: {}",
segmentUriToTarPathMap, Arrays.toString(spec.getPinotClusterSpecs()),
tableName,
@@ -419,8 +482,8 @@ public class SegmentPushUtils implements Serializable {
try {
addHeaders(spec, reqHttpHeaders);
URI segmentUploadURI = getBatchSegmentUploadURI(controllerURI);
- SimpleHttpResponse response =
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI,
- allSegmentsMetadataMap, reqHttpHeaders, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ SimpleHttpResponse response =
fileUploadDownloadClient.uploadSegmentMetadataFiles(segmentUploadURI,
+ allSegmentsMetadataMap, reqHttpHeaders, parameters,
socketTimeoutMs);
LOGGER.info("Response for pushing table {} segments {} to location
{} - {}: {}", tableName,
segmentMetadataFileMap.keySet(), controllerURI,
response.getStatusCode(), response.getResponse());
return true;
@@ -447,6 +510,7 @@ public class SegmentPushUtils implements Serializable {
if (allSegmentsMetadataTarFile != null) {
FileUtils.deleteQuietly(allSegmentsMetadataTarFile);
}
+ closeFileUploadDownloadClient(spec, fileUploadDownloadClient);
executor.shutdown();
}
}
@@ -522,6 +586,14 @@ public class SegmentPushUtils implements Serializable {
}
}
+ private static List<NameValuePair>
makeUploadSegmentParams(List<NameValuePair> parameters, String tableName,
+ TableType tableType) {
+ List<NameValuePair> requestParams = parameters == null ? new ArrayList<>()
: new ArrayList<>(parameters);
+ requestParams.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
tableName));
+ requestParams.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
tableType.name()));
+ return requestParams;
+ }
+
// Method helps create an uber tar file which contains the metadata files
for all segments that are to be uploaded.
// Additionally, it contains a segmentName to segmentDownloadURI mapping
file which allows us to avoid sending the
// segmentDownloadURI as a header field as there are limitations on the
number of headers allowed in the http request.
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
index 8a86283e15f..0bf6005b8c3 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
@@ -18,11 +18,22 @@
*/
package org.apache.pinot.segment.local.utils;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsParameters;
+import com.sun.net.httpserver.HttpsServer;
import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.security.SecureRandom;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -30,15 +41,28 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.io.FileUtils;
+import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.http.HttpClientConfig;
+import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TlsSpec;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -49,10 +73,22 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
public class SegmentPushUtilsTest {
+ private static final String TLS_RESOURCE_FOLDER = "tls/";
+ private static final String TLS_KEYSTORE_FILE = "keystore.p12";
+ private static final String TLS_TRUSTSTORE_FILE = "truststore.p12";
+ private static final String TLS_STORE_PASSWORD = "changeit";
+ private static final String TLS_STORE_TYPE = "PKCS12";
+ private static final String TEST_TABLE_NAME = "testTable";
+ private static final String TEST_SEGMENT_NAME = "testSegment";
+ private static final String TEST_SEGMENT_URI =
"file:///tmp/testSegment.tar.gz";
+ private static final String TEST_LINEAGE_ENTRY_ID = "lineageEntry";
+ private static final String OK_RESPONSE = "OK";
+
private File _tempDir;
@BeforeMethod
@@ -83,6 +119,270 @@ public class SegmentPushUtilsTest {
Assert.assertEquals(nameValuePair.getValue(), "true");
}
+ @Test
+ public void testSendSegmentUrisHonorsTlsSpec()
+ throws Exception {
+ AtomicBoolean receivedTlsRequest = new AtomicBoolean();
+ HttpsServer httpsServer =
+ createHttpsServer("/v2/segments", new
TestSegmentUploadHandler(receivedTlsRequest));
+ try {
+ URI controllerUri = new URI("https://localhost:" +
httpsServer.getAddress().getPort());
+
+ SegmentGenerationJobSpec defaultJobSpec =
createSegmentGenerationJobSpec(controllerUri, null);
+ assertThrows(Exception.class, () ->
SegmentPushUtils.sendSegmentUris(defaultJobSpec, List.of(TEST_SEGMENT_URI)));
+
+ SegmentGenerationJobSpec tlsJobSpec =
createSegmentGenerationJobSpec(controllerUri, createTlsSpec());
+ SegmentPushUtils.sendSegmentUris(tlsJobSpec, List.of(TEST_SEGMENT_URI));
+ assertTrue(receivedTlsRequest.get());
+ } finally {
+ httpsServer.stop(0);
+ }
+ }
+
+ @Test
+ public void testConsistentDataPushGetSegmentsToReplaceHonorsTlsSpec()
+ throws Exception {
+ AtomicBoolean receivedTlsRequest = new AtomicBoolean();
+ HttpsServer httpsServer =
+ createHttpsServer("/segments/" + TEST_TABLE_NAME, new
TestSegmentListHandler(receivedTlsRequest));
+ try {
+ URI controllerUri = new URI("https://localhost:" +
httpsServer.getAddress().getPort());
+
+ SegmentGenerationJobSpec tlsJobSpec =
createSegmentGenerationJobSpec(controllerUri, createTlsSpec());
+ Map<URI, List<String>> segmentsToReplace =
+ ConsistentDataPushUtils.getSegmentsToReplace(tlsJobSpec,
TEST_TABLE_NAME);
+
+ assertEquals(segmentsToReplace.get(controllerUri),
List.of(TEST_SEGMENT_NAME));
+ assertTrue(receivedTlsRequest.get());
+ } finally {
+ httpsServer.stop(0);
+ }
+ }
+
+ @Test(timeOut = 5000)
+ public void testSendSegmentUrisHonorsTlsSpecReadTimeout()
+ throws Exception {
+ AtomicBoolean receivedTlsRequest = new AtomicBoolean();
+ CountDownLatch releaseResponse = new CountDownLatch(1);
+ HttpsServer httpsServer =
+ createHttpsServer("/v2/segments", new
TestSegmentUploadHandler(receivedTlsRequest, releaseResponse));
+ try {
+ URI controllerUri = new URI("https://localhost:" +
httpsServer.getAddress().getPort());
+ TlsSpec tlsSpec = createTlsSpec();
+ tlsSpec.setReadTimeout(100);
+ SegmentGenerationJobSpec tlsJobSpec =
createSegmentGenerationJobSpec(controllerUri, tlsSpec);
+
+ assertThrows(Exception.class, () ->
SegmentPushUtils.sendSegmentUris(tlsJobSpec, List.of(TEST_SEGMENT_URI)));
+ assertTrue(receivedTlsRequest.get());
+ } finally {
+ releaseResponse.countDown();
+ httpsServer.stop(0);
+ }
+ }
+
+ @Test
+ public void testTlsSpecConnectTimeoutConfigIsApplied() {
+ TlsSpec tlsSpec = createTlsSpec();
+ tlsSpec.setConnectTimeout(1234);
+
+ HttpClientConfig httpClientConfig =
SegmentPushUtils.getHttpClientConfig(tlsSpec);
+
+ assertEquals(httpClientConfig.getConnectionTimeoutMs(), 1234);
+ }
+
+ @Test
+ public void testConsistentDataPushStartEndRevertHonorsTlsSpec()
+ throws Exception {
+ AtomicBoolean receivedStartRequest = new AtomicBoolean();
+ AtomicBoolean receivedEndRequest = new AtomicBoolean();
+ AtomicBoolean receivedRevertRequest = new AtomicBoolean();
+ HttpsServer httpsServer = createHttpsServer(Map.of(
+ "/segments/" + TEST_TABLE_NAME + "/startReplaceSegments",
+ new TestConsistentDataPushHandler(receivedStartRequest,
+ "{\"segmentLineageEntryId\":\"" + TEST_LINEAGE_ENTRY_ID + "\"}",
"type=OFFLINE", "forceCleanup=true"),
+ "/segments/" + TEST_TABLE_NAME + "/endReplaceSegments",
+ new TestConsistentDataPushHandler(receivedEndRequest, OK_RESPONSE,
+ "segmentLineageEntryId=" + TEST_LINEAGE_ENTRY_ID, "cleanup=false"),
+ "/segments/" + TEST_TABLE_NAME + "/revertReplaceSegments",
+ new TestConsistentDataPushHandler(receivedRevertRequest, OK_RESPONSE,
+ "segmentLineageEntryId=" + TEST_LINEAGE_ENTRY_ID,
"forceRevert=true")));
+ try {
+ URI controllerUri = new URI("https://localhost:" +
httpsServer.getAddress().getPort());
+ SegmentGenerationJobSpec tlsJobSpec =
createSegmentGenerationJobSpec(controllerUri, createTlsSpec());
+
+ Map<URI, String> lineageEntryIds =
ConsistentDataPushUtils.startReplaceSegments(tlsJobSpec,
+ Map.of(controllerUri, List.of("oldSegment")),
List.of(TEST_SEGMENT_NAME));
+ assertEquals(lineageEntryIds.get(controllerUri), TEST_LINEAGE_ENTRY_ID);
+
+ ConsistentDataPushUtils.endReplaceSegments(tlsJobSpec, lineageEntryIds);
+ ConsistentDataPushUtils.handleUploadException(tlsJobSpec,
lineageEntryIds, new RuntimeException("test"));
+
+ assertTrue(receivedStartRequest.get());
+ assertTrue(receivedEndRequest.get());
+ assertTrue(receivedRevertRequest.get());
+ } finally {
+ httpsServer.stop(0);
+ }
+ }
+
+ private static HttpsServer createHttpsServer(String path, HttpHandler
handler)
+ throws Exception {
+ return createHttpsServer(Map.of(path, handler));
+ }
+
+ private static HttpsServer createHttpsServer(Map<String, HttpHandler>
handlers)
+ throws Exception {
+ SSLContext sslContext = createTestSslContext();
+ HttpsServer server = HttpsServer.create(new InetSocketAddress("localhost",
0), 0);
+ server.setHttpsConfigurator(new HttpsConfigurator(sslContext) {
+ @Override
+ public void configure(HttpsParameters params) {
+ SSLParameters sslParameters = sslContext.getDefaultSSLParameters();
+ sslParameters.setNeedClientAuth(true);
+ params.setSSLParameters(sslParameters);
+ }
+ });
+ handlers.forEach(server::createContext);
+ server.setExecutor(null);
+ server.start();
+ return server;
+ }
+
+ private static SegmentGenerationJobSpec createSegmentGenerationJobSpec(URI
controllerUri, TlsSpec tlsSpec) {
+ TableSpec tableSpec = new TableSpec();
+ tableSpec.setTableName(TEST_TABLE_NAME);
+
+ PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+ pinotClusterSpec.setControllerURI(controllerUri.toString());
+
+ SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+ jobSpec.setTableSpec(tableSpec);
+ jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{pinotClusterSpec});
+ jobSpec.setPushJobSpec(new PushJobSpec());
+ jobSpec.setTlsSpec(tlsSpec);
+ return jobSpec;
+ }
+
+ private static SSLContext createTestSslContext()
+ throws Exception {
+ KeyManagerFactory keyManagerFactory =
+
TlsUtils.createKeyManagerFactory(getTlsResourcePath(TLS_KEYSTORE_FILE),
TLS_STORE_PASSWORD, TLS_STORE_TYPE);
+ TrustManagerFactory trustManagerFactory =
+
TlsUtils.createTrustManagerFactory(getTlsResourcePath(TLS_TRUSTSTORE_FILE),
TLS_STORE_PASSWORD, TLS_STORE_TYPE);
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(keyManagerFactory.getKeyManagers(),
trustManagerFactory.getTrustManagers(), new SecureRandom());
+ return sslContext;
+ }
+
+ private static TlsSpec createTlsSpec() {
+ TlsSpec tlsSpec = new TlsSpec();
+ tlsSpec.setKeyStoreType(TLS_STORE_TYPE);
+ tlsSpec.setKeyStorePath(getTlsResourcePath(TLS_KEYSTORE_FILE));
+ tlsSpec.setKeyStorePassword(TLS_STORE_PASSWORD);
+ tlsSpec.setTrustStoreType(TLS_STORE_TYPE);
+ tlsSpec.setTrustStorePath(getTlsResourcePath(TLS_TRUSTSTORE_FILE));
+ tlsSpec.setTrustStorePassword(TLS_STORE_PASSWORD);
+ return tlsSpec;
+ }
+
+ private static String getTlsResourcePath(String fileName) {
+ URL resource =
SegmentPushUtilsTest.class.getClassLoader().getResource(TLS_RESOURCE_FOLDER +
fileName);
+ Assert.assertNotNull(resource, "Missing TLS test resource: " + fileName);
+ return resource.toString();
+ }
+
+ private static class TestSegmentUploadHandler implements HttpHandler {
+ private final AtomicBoolean _receivedTlsRequest;
+ private final CountDownLatch _releaseResponse;
+
+ private TestSegmentUploadHandler(AtomicBoolean receivedTlsRequest) {
+ this(receivedTlsRequest, null);
+ }
+
+ private TestSegmentUploadHandler(AtomicBoolean receivedTlsRequest,
CountDownLatch releaseResponse) {
+ _receivedTlsRequest = receivedTlsRequest;
+ _releaseResponse = releaseResponse;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange)
+ throws IOException {
+ Headers requestHeaders = httpExchange.getRequestHeaders();
+
assertEquals(requestHeaders.getFirst(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE),
+ FileUploadDownloadClient.FileUploadType.URI.toString());
+
assertEquals(requestHeaders.getFirst(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI),
TEST_SEGMENT_URI);
+ _receivedTlsRequest.set(true);
+ if (_releaseResponse != null) {
+ try {
+ _releaseResponse.await(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+
+ writeResponse(httpExchange, OK_RESPONSE);
+ }
+ }
+
+ private static class TestConsistentDataPushHandler implements HttpHandler {
+ private final AtomicBoolean _receivedTlsRequest;
+ private final String _responseBody;
+ private final List<String> _expectedQueryFragments;
+
+ private TestConsistentDataPushHandler(AtomicBoolean receivedTlsRequest,
String responseBody,
+ String... expectedQueryFragments) {
+ _receivedTlsRequest = receivedTlsRequest;
+ _responseBody = responseBody;
+ _expectedQueryFragments = Arrays.asList(expectedQueryFragments);
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange)
+ throws IOException {
+ assertEquals(httpExchange.getRequestMethod(), "POST");
+ String query = httpExchange.getRequestURI().getQuery();
+ for (String expectedQueryFragment : _expectedQueryFragments) {
+ assertTrue(query.contains(expectedQueryFragment), query);
+ }
+ _receivedTlsRequest.set(true);
+
+ writeResponse(httpExchange, _responseBody);
+ }
+ }
+
+ private static class TestSegmentListHandler implements HttpHandler {
+ private final AtomicBoolean _receivedTlsRequest;
+
+ private TestSegmentListHandler(AtomicBoolean receivedTlsRequest) {
+ _receivedTlsRequest = receivedTlsRequest;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange)
+ throws IOException {
+ _receivedTlsRequest.set(true);
+ writeResponse(httpExchange, "[{\"OFFLINE\":[\"" + TEST_SEGMENT_NAME +
"\"]}]");
+ }
+ }
+
+ private static void writeResponse(HttpExchange httpExchange, String
responseBody)
+ throws IOException {
+ byte[] response = responseBody.getBytes(StandardCharsets.UTF_8);
+ httpExchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
+ try (OutputStream os = httpExchange.getResponseBody()) {
+ os.write(response);
+ }
+ }
+
+ @Test
+ public void testGetOrCreateFileUploadDownloadClientUsesSharedDefaultClient()
+ throws Exception {
+ SegmentGenerationJobSpec defaultJobSpec = new SegmentGenerationJobSpec();
+ FileUploadDownloadClient defaultClient =
SegmentPushUtils.getOrCreateFileUploadDownloadClient(defaultJobSpec);
+
Assert.assertSame(SegmentPushUtils.getOrCreateFileUploadDownloadClient(defaultJobSpec),
defaultClient);
+ }
+
@Test
public void testGetSegmentUriToTarPathMap()
throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]