This is an automated email from the ASF dual-hosted git repository. apucher pushed a commit to branch pinot-flexible-auth-provider in repository https://gitbox.apache.org/repos/asf/pinot.git
commit c20007e6dc95581cc3acb17fb0969cc721ca4238 Author: Alexander Pucher <[email protected]> AuthorDate: Mon May 9 18:03:08 2022 -0700 pinot auth provider draft --- .../pinot/common/auth/AuthProviderUtils.java | 117 +++++++++++++++++++++ .../apache/pinot/common/auth/NullAuthProvider.java | 19 ++-- .../pinot/common/auth/StaticTokenAuthProvider.java | 34 ++++-- .../apache/pinot/common/auth/UrlAuthProvider.java | 65 ++++++++++++ .../common/utils/FileUploadDownloadClient.java | 43 ++++---- .../common/utils/fetcher/BaseSegmentFetcher.java | 7 +- .../common/utils/fetcher/HttpSegmentFetcher.java | 4 +- .../utils/fetcher/SegmentFetcherFactory.java | 6 ++ .../apache/pinot/common/utils/http/HttpClient.java | 70 +++++------- .../api/access/AuthenticationFilter.java | 3 +- .../resources/PinotIngestionRestletResource.java | 11 +- .../pinot/controller/util/FileIngestionHelper.java | 10 +- .../core/data/manager/BaseTableDataManager.java | 7 +- .../manager/realtime/SegmentCommitterFactory.java | 2 +- .../realtime/Server2ControllerSegmentUploader.java | 11 +- .../ServerSegmentCompletionProtocolHandler.java | 19 ++-- .../tests/BasicAuthBatchIntegrationTest.java | 3 +- .../BaseMultipleSegmentsConversionExecutor.java | 20 ++-- .../tasks/BaseSingleSegmentConversionExecutor.java | 7 +- .../minion/tasks/SegmentConversionUtils.java | 17 +-- .../segmentuploader/SegmentUploaderDefault.java | 10 +- .../local/data/manager/TableDataManagerConfig.java | 6 ++ .../pinot/segment/local/utils/IngestionUtils.java | 16 +-- .../segment/local/utils/SegmentPushUtils.java | 12 ++- .../helix/HelixInstanceDataManagerConfig.java | 7 ++ .../auth/{AuthContext.java => AuthProvider.java} | 21 ++-- .../config/instance/InstanceDataManagerConfig.java | 2 + .../batch/spec/SegmentGenerationJobSpec.java | 6 ++ .../segment/uploader/SegmentUploader.java | 10 +- .../apache/pinot/spi/utils/CommonConstants.java | 2 + .../org/apache/pinot/tools/BootstrapTableTool.java | 10 +- .../admin/command/AbstractBaseAdminCommand.java | 7 +- .../tools/admin/command/AddSchemaCommand.java | 6 +- .../pinot/tools/admin/command/AddTableCommand.java | 13 ++- .../tools/admin/command/AddTenantCommand.java | 5 +- .../tools/admin/command/BootstrapTableCommand.java | 6 +- .../tools/admin/command/ChangeTableState.java | 10 +- .../tools/admin/command/ImportDataCommand.java | 11 +- .../command/LaunchDataIngestionJobCommand.java | 5 +- .../admin/command/OperateClusterConfigCommand.java | 5 +- .../tools/admin/command/PostQueryCommand.java | 11 +- .../tools/admin/command/QuickstartRunner.java | 19 +++- .../tools/admin/command/UploadSegmentCommand.java | 14 ++- 43 files changed, 493 insertions(+), 196 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java new file mode 100644 index 0000000000..c8bac0593f --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.auth; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; +import org.apache.pinot.spi.auth.AuthProvider; + + +/** + * Utility class to wrap inference of optimal auth provider from component configs. + */ +public final class AuthProviderUtils { + private AuthProviderUtils() { + // left blank + } + + /** + * Infer optimal auth provider based on the availability of static token, if any. + * + * @param authToken static auth token + * @return auth provider + */ + public static AuthProvider inferProvider(String authToken) { + return inferProvider(authToken, null); + } + + /** + * Infer optimal auth provider based on the availability of token and token url, if any. + * + * @param authToken static auth token + * @param authTokenUrl dynamic token URL + * @return auth provider + */ + public static AuthProvider inferProvider(String authToken, String authTokenUrl) { + if (StringUtils.isNotBlank(authTokenUrl)) { + return new UrlAuthProvider(authTokenUrl); + } + if (StringUtils.isNotBlank(authToken)) { + return new StaticTokenAuthProvider(authToken); + } + return new NullAuthProvider(); + } + + /** + * Resolve auth token right now, e.g. for job specs. + * + * @param authToken static auth token + * @param authTokenUrl dynamic token URL + * @return resolved static token + */ + public static String resolveToToken(String authToken, String authTokenUrl) { + return resolveToToken(inferProvider(authToken, authTokenUrl)); + } + + /** + * Resolve auth provider to token right now. + * + * @param authProvider + * @return + */ + public static String resolveToToken(AuthProvider authProvider) { + if (authProvider == null) { + return null; + } + return authProvider.getHttpHeaders().entrySet().stream().findFirst().map(Map.Entry::getValue) + .filter(Objects::nonNull).map(Object::toString).orElse(null); + } + + /** + * Convenience helper to convert Map to list of Http Headers + * @param headers header map + * @return list of http headers + */ + public static List<Header> toHeaders(@Nullable Map<String, Object> headers) { + if (headers == null) { + return Collections.emptyList(); + } + return headers.entrySet().stream().filter(entry -> Objects.nonNull(entry.getValue())) + .map(entry -> new BasicHeader(entry.getKey(), entry.getValue().toString())).collect(Collectors.toList()); + } + + /** + * Convenience helper to convert an optional authProvider to a list of http headers + * @param authProvider auth provider + * @return list of http headers + */ + public static List<Header> toHeaders(@Nullable AuthProvider authProvider) { + if (authProvider == null) { + return Collections.emptyList(); + } + return toHeaders(authProvider.getHttpHeaders()); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java b/pinot-common/src/main/java/org/apache/pinot/common/auth/NullAuthProvider.java similarity index 73% copy from pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java copy to pinot-common/src/main/java/org/apache/pinot/common/auth/NullAuthProvider.java index 5a9798c355..80756f3aae 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/auth/NullAuthProvider.java @@ -16,19 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.auth; +package org.apache.pinot.common.auth; -/** - * Container for all auth related info - */ -public class AuthContext { - private final String _authToken; +import java.util.Collections; +import java.util.Map; +import org.apache.pinot.spi.auth.AuthProvider; - public AuthContext(String authToken) { - _authToken = authToken; - } - public String getAuthToken() { - return _authToken; +public class NullAuthProvider implements AuthProvider { + @Override + public Map<String, Object> getHttpHeaders() { + return Collections.emptyMap(); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java b/pinot-common/src/main/java/org/apache/pinot/common/auth/StaticTokenAuthProvider.java similarity index 53% copy from pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java copy to pinot-common/src/main/java/org/apache/pinot/common/auth/StaticTokenAuthProvider.java index 5a9798c355..fe0406598d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/auth/StaticTokenAuthProvider.java @@ -16,19 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.auth; +package org.apache.pinot.common.auth; -/** - * Container for all auth related info - */ -public class AuthContext { - private final String _authToken; +import java.util.Collections; +import java.util.Map; +import javax.ws.rs.core.HttpHeaders; +import org.apache.pinot.spi.auth.AuthProvider; + + +public class StaticTokenAuthProvider implements AuthProvider { + final String _header; + final String _prefix; + final String _token; + + public StaticTokenAuthProvider(String token) { + _header = HttpHeaders.AUTHORIZATION; + _prefix = ""; + _token = token; + } - public AuthContext(String authToken) { - _authToken = authToken; + public StaticTokenAuthProvider(String header, String prefix, String token) { + _header = header; + _prefix = prefix; + _token = token; } - public String getAuthToken() { - return _authToken; + @Override + public Map<String, Object> getHttpHeaders() { + return Collections.singletonMap(_header, _prefix + _token); } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/auth/UrlAuthProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/auth/UrlAuthProvider.java new file mode 100644 index 0000000000..0c8653453c --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/auth/UrlAuthProvider.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.auth; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; +import javax.ws.rs.core.HttpHeaders; +import org.apache.commons.io.IOUtils; +import org.apache.pinot.spi.auth.AuthProvider; + + +public class UrlAuthProvider implements AuthProvider { + final String _header; + final String _prefix; + final URL _url; + + public UrlAuthProvider(String url) { + try { + _header = HttpHeaders.AUTHORIZATION; + _prefix = "Bearer "; + _url = new URL(url); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } + + public UrlAuthProvider(String header, String prefix, String url) { + try { + _header = header; + _prefix = prefix; + _url = new URL(url); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Map<String, Object> getHttpHeaders() { + try { + return Collections.singletonMap(_header, _prefix + IOUtils.toString(_url, StandardCharsets.UTF_8)); + } catch (IOException e) { + throw new IllegalArgumentException("Could not access auth url", e); + } + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 1d10de2bca..10429387a5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -48,9 +48,11 @@ import org.apache.http.entity.mime.content.ContentBody; import org.apache.http.entity.mime.content.FileBody; import org.apache.http.entity.mime.content.InputStreamBody; import org.apache.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest; import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; @@ -402,23 +404,20 @@ public class FileUploadDownloadClient implements AutoCloseable { } private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs, - @Nullable String authToken) { + @Nullable AuthProvider authProvider) { RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1) .setHeader(HttpHeaders.CONTENT_TYPE, HttpClient.JSON_CONTENT_TYPE) .setEntity(new StringEntity(jsonRequestBody, ContentType.APPLICATION_JSON)); - if (StringUtils.isNotBlank(authToken)) { - requestBuilder.addHeader("Authorization", authToken); - } + AuthProviderUtils.toHeaders(authProvider).forEach(requestBuilder::addHeader); HttpClient.setTimeout(requestBuilder, socketTimeoutMs); return requestBuilder.build(); } - private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs, @Nullable String authToken) { + private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs, + @Nullable AuthProvider authProvider) { RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1) .setHeader(HttpHeaders.CONTENT_TYPE, HttpClient.JSON_CONTENT_TYPE); - if (StringUtils.isNotBlank(authToken)) { - requestBuilder.addHeader("Authorization", authToken); - } + AuthProviderUtils.toHeaders(authProvider).forEach(requestBuilder::addHeader); HttpClient.setTimeout(requestBuilder, socketTimeoutMs); return requestBuilder.build(); } @@ -856,17 +855,17 @@ public class FileUploadDownloadClient implements AutoCloseable { * * @param uri URI * @param startReplaceSegmentsRequest request - * @param authToken auth token + * @param authProvider auth provider * @return Response * @throws IOException * @throws HttpErrorStatusException */ public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest, - @Nullable String authToken) + @Nullable AuthProvider authProvider) throws IOException, HttpErrorStatusException { return HttpClient.wrapAndThrowHttpException(_httpClient.sendRequest( getStartReplaceSegmentsRequest(uri, JsonUtils.objectToString(startReplaceSegmentsRequest), - HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, authToken))); + HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, authProvider))); } /** @@ -874,15 +873,15 @@ public class FileUploadDownloadClient implements AutoCloseable { * * @param uri URI * @oaram socketTimeoutMs Socket timeout in milliseconds - * @param authToken auth token + * @param authProvider auth provider * @return Response * @throws IOException * @throws HttpErrorStatusException */ - public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs, @Nullable String authToken) + public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs, @Nullable AuthProvider authProvider) throws IOException, HttpErrorStatusException { return HttpClient.wrapAndThrowHttpException( - _httpClient.sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs, authToken))); + _httpClient.sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs, authProvider))); } /** @@ -940,7 +939,7 @@ public class FileUploadDownloadClient implements AutoCloseable { * * Download a file using default settings, with an optional auth token * - * @see HttpClient#downloadFile(URI, int, File, String, List) + * @see HttpClient#downloadFile(URI, int, File, AuthProvider, List) * * @param uri URI * @param socketTimeoutMs Socket timeout in milliseconds @@ -960,7 +959,7 @@ public class FileUploadDownloadClient implements AutoCloseable { * * Download a file. * - * @see FileUploadDownloadClient#downloadFile(URI, File, String) + * @see FileUploadDownloadClient#downloadFile(URI, File, AuthProvider) * * @param uri URI * @param dest File destination @@ -979,14 +978,14 @@ public class FileUploadDownloadClient implements AutoCloseable { * * @param uri URI * @param dest File destination - * @param authToken auth token + * @param authProvider auth provider * @return Response status code * @throws IOException * @throws HttpErrorStatusException */ - public int downloadFile(URI uri, File dest, String authToken) + public int downloadFile(URI uri, File dest, AuthProvider authProvider) throws IOException, HttpErrorStatusException { - return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, dest, null, null); + return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, dest, authProvider, null); } /** @@ -994,15 +993,15 @@ public class FileUploadDownloadClient implements AutoCloseable { * * @param uri URI * @param dest File destination - * @param authToken auth token + * @param authProvider auth provider * @param httpHeaders http headers * @return Response status code * @throws IOException * @throws HttpErrorStatusException */ - public int downloadFile(URI uri, File dest, String authToken, List<Header> httpHeaders) + public int downloadFile(URI uri, File dest, AuthProvider authProvider, List<Header> httpHeaders) throws IOException, HttpErrorStatusException { - return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, dest, authToken, httpHeaders); + return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, dest, authProvider, httpHeaders); } /** diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java index 497822ffb7..21479feece 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java @@ -22,6 +22,8 @@ import java.io.File; import java.net.URI; import java.util.List; import java.util.Random; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -37,6 +39,7 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher { public static final String RETRY_WAIT_MS_CONFIG_KEY = "retry.wait.ms"; public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY = "retry.delay.scale.factor"; public static final String AUTH_TOKEN = CommonConstants.KEY_OF_AUTH_TOKEN; + public static final String AUTH_TOKEN_URL = CommonConstants.KEY_OF_AUTH_TOKEN_URL; public static final int DEFAULT_RETRY_COUNT = 3; public static final int DEFAULT_RETRY_WAIT_MS = 100; public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5; @@ -46,14 +49,14 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher { protected int _retryCount; protected int _retryWaitMs; protected int _retryDelayScaleFactor; - protected String _authToken; + protected AuthProvider _authProvider; @Override public void init(PinotConfiguration config) { _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT); _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS); _retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR); - _authToken = config.getProperty(AUTH_TOKEN); + _authProvider = AuthProviderUtils.inferProvider(config.getProperty(AUTH_TOKEN), config.getProperty(AUTH_TOKEN_URL)); doInit(config); _logger .info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs, diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java index 1c8c9286d5..a2f31ad3bb 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java @@ -63,7 +63,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { if (!InetAddresses.isInetAddress(hostName)) { httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + port)); } - int statusCode = _httpClient.downloadFile(uri, dest, _authToken, httpHeaders); + int statusCode = _httpClient.downloadFile(uri, dest, _authProvider, httpHeaders); _logger .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(), statusCode); @@ -94,7 +94,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { public void fetchSegmentToLocalWithoutRetry(URI uri, File dest) throws Exception { try { - int statusCode = _httpClient.downloadFile(uri, dest, _authToken); + int statusCode = _httpClient.downloadFile(uri, dest, _authProvider); _logger.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(), statusCode); } catch (Exception e) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java index 743cd1b7d0..577991193f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java @@ -39,6 +39,7 @@ public class SegmentFetcherFactory { static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class"; private static final String PROTOCOLS_KEY = "protocols"; private static final String AUTH_TOKEN_KEY = CommonConstants.KEY_OF_AUTH_TOKEN; + private static final String AUTH_TOKEN_URL_KEY = CommonConstants.KEY_OF_AUTH_TOKEN_URL; private static final String ENCODED_SUFFIX = ".enc"; private static final Logger LOGGER = LoggerFactory.getLogger(SegmentFetcherFactory.class); @@ -91,10 +92,15 @@ public class SegmentFetcherFactory { } String authToken = config.getProperty(AUTH_TOKEN_KEY); + String authTokenUrl = config.getProperty(AUTH_TOKEN_URL_KEY); + Map<String, Object> subConfigMap = config.subset(protocol).toMap(); if (!subConfigMap.containsKey(AUTH_TOKEN_KEY) && StringUtils.isNotBlank(authToken)) { subConfigMap.put(AUTH_TOKEN_KEY, authToken); } + if (!subConfigMap.containsKey(AUTH_TOKEN_URL_KEY) && StringUtils.isNotBlank(authTokenUrl)) { + subConfigMap.put(AUTH_TOKEN_URL_KEY, authTokenUrl); + } segmentFetcher.init(new PinotConfiguration(subConfigMap)); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java index 62948d2921..2327066656 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java @@ -35,7 +35,6 @@ import javax.net.ssl.SSLContext; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.MapUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; @@ -55,12 +54,13 @@ import org.apache.http.entity.StringEntity; import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; -import org.apache.http.message.BasicHeader; import org.apache.http.util.EntityUtils; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.SimpleHttpErrorInfo; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.TlsUtils; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; @@ -110,7 +110,7 @@ public class HttpClient implements AutoCloseable { /** * Deprecated due to lack of auth header support. May break for deployments with auth enabled * - * @see #sendGetRequest(URI, Map, String) + * @see #sendGetRequest(URI, Map, AuthProvider) */ public SimpleHttpResponse sendGetRequest(URI uri) throws IOException { @@ -122,17 +122,16 @@ public class HttpClient implements AutoCloseable { return sendGetRequest(uri, headers, null); } - public SimpleHttpResponse sendGetRequest(URI uri, @Nullable Map<String, String> headers, @Nullable String authToken) + public SimpleHttpResponse sendGetRequest(URI uri, @Nullable Map<String, String> headers, + @Nullable AuthProvider authProvider) throws IOException { RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1); + AuthProviderUtils.toHeaders(authProvider).forEach(requestBuilder::addHeader); if (MapUtils.isNotEmpty(headers)) { for (Map.Entry<String, String> header : headers.entrySet()) { requestBuilder.addHeader(header.getKey(), header.getValue()); } } - if (StringUtils.isNotBlank(authToken)) { - requestBuilder.addHeader(AUTH_HTTP_HEADER, authToken); - } setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS); return sendRequest(requestBuilder.build()); } @@ -140,7 +139,7 @@ public class HttpClient implements AutoCloseable { /** * Deprecated due to lack of auth header support. May break for deployments with auth enabled * - * @see #sendDeleteRequest(URI, Map, String) + * @see #sendDeleteRequest(URI, Map, AuthProvider) */ public SimpleHttpResponse sendDeleteRequest(URI uri) throws IOException { @@ -153,12 +152,10 @@ public class HttpClient implements AutoCloseable { } public SimpleHttpResponse sendDeleteRequest(URI uri, @Nullable Map<String, String> headers, - @Nullable String authToken) + @Nullable AuthProvider authProvider) throws IOException { RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1); - if (StringUtils.isNotBlank(authToken)) { - requestBuilder.addHeader(AUTH_HTTP_HEADER, authToken); - } + AuthProviderUtils.toHeaders(authProvider).forEach(requestBuilder::addHeader); if (MapUtils.isNotEmpty(headers)) { for (Map.Entry<String, String> header : headers.entrySet()) { requestBuilder.addHeader(header.getKey(), header.getValue()); @@ -171,7 +168,7 @@ public class HttpClient implements AutoCloseable { /** * Deprecated due to lack of auth header support. May break for deployments with auth enabled * - * @see #sendPostRequest(URI, HttpEntity, Map, String) + * @see #sendPostRequest(URI, HttpEntity, Map, AuthProvider) */ public SimpleHttpResponse sendPostRequest(URI uri, @Nullable HttpEntity payload, @Nullable Map<String, String> headers) @@ -180,15 +177,13 @@ public class HttpClient implements AutoCloseable { } public SimpleHttpResponse sendPostRequest(URI uri, @Nullable HttpEntity payload, - @Nullable Map<String, String> headers, @Nullable String authToken) + @Nullable Map<String, String> headers, @Nullable AuthProvider authProvider) throws IOException { RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1); if (payload != null) { requestBuilder.setEntity(payload); } - if (StringUtils.isNotBlank(authToken)) { - requestBuilder.addHeader(AUTH_HTTP_HEADER, authToken); - } + AuthProviderUtils.toHeaders(authProvider).forEach(requestBuilder::addHeader); if (MapUtils.isNotEmpty(headers)) { for (Map.Entry<String, String> header : headers.entrySet()) { requestBuilder.addHeader(header.getKey(), header.getValue()); @@ -201,7 +196,7 @@ public class HttpClient implements AutoCloseable { /** * Deprecated due to lack of auth header support. May break for deployments with auth enabled * - * @see #sendPutRequest(URI, HttpEntity, Map, String) + * @see #sendPutRequest(URI, HttpEntity, Map, AuthProvider) */ public SimpleHttpResponse sendPutRequest(URI uri, @Nullable HttpEntity payload, @Nullable Map<String, String> headers) throws IOException { @@ -209,15 +204,13 @@ public class HttpClient implements AutoCloseable { } public SimpleHttpResponse sendPutRequest(URI uri, @Nullable HttpEntity payload, @Nullable Map<String, String> headers, - @Nullable String authToken) + @Nullable AuthProvider authProvider) throws IOException { RequestBuilder requestBuilder = RequestBuilder.put(uri).setVersion(HttpVersion.HTTP_1_1); if (payload != null) { requestBuilder.setEntity(payload); } - if (StringUtils.isNotBlank(authToken)) { - requestBuilder.addHeader(AUTH_HTTP_HEADER, authToken); - } + AuthProviderUtils.toHeaders(authProvider).forEach(requestBuilder::addHeader); if (MapUtils.isNotEmpty(headers)) { for (Map.Entry<String, String> header : headers.entrySet()) { requestBuilder.addHeader(header.getKey(), header.getValue()); @@ -243,13 +236,13 @@ public class HttpClient implements AutoCloseable { } public SimpleHttpResponse sendJsonPostRequest(URI uri, @Nullable String jsonRequestBody, - @Nullable Map<String, String> headers, @Nullable String authToken) + @Nullable Map<String, String> headers, @Nullable AuthProvider authProvider) throws IOException { Map<String, String> headersWrapper = MapUtils.isEmpty(headers) ? new HashMap<>() : new HashMap<>(headers); headersWrapper.put(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE); HttpEntity entity = jsonRequestBody == null ? null : new StringEntity(jsonRequestBody, ContentType.APPLICATION_JSON); - return sendPostRequest(uri, entity, headers, authToken); + return sendPostRequest(uri, entity, headers, authProvider); } public SimpleHttpResponse sendJsonPutRequest(URI uri, @Nullable String jsonRequestBody) @@ -264,13 +257,13 @@ public class HttpClient implements AutoCloseable { } public SimpleHttpResponse sendJsonPutRequest(URI uri, @Nullable String jsonRequestBody, - @Nullable Map<String, String> headers, @Nullable String authToken) + @Nullable Map<String, String> headers, @Nullable AuthProvider authProvider) throws IOException { Map<String, String> headersWrapper = MapUtils.isEmpty(headers) ? new HashMap<>() : new HashMap<>(headers); headersWrapper.put(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE); HttpEntity entity = jsonRequestBody == null ? null : new StringEntity(jsonRequestBody, ContentType.APPLICATION_JSON); - return sendPutRequest(uri, entity, headersWrapper, authToken); + return sendPutRequest(uri, entity, headersWrapper, authProvider); } // -------------------------------------------------------------------------- @@ -366,15 +359,15 @@ public class HttpClient implements AutoCloseable { * @param uri URI * @param socketTimeoutMs Socket timeout in milliseconds * @param dest File destination - * @param authToken auth token + * @param authProvider auth provider * @param httpHeaders http headers * @return Response status code * @throws IOException * @throws HttpErrorStatusException */ - public int downloadFile(URI uri, int socketTimeoutMs, File dest, String authToken, List<Header> httpHeaders) + public int downloadFile(URI uri, int socketTimeoutMs, File dest, AuthProvider authProvider, List<Header> httpHeaders) throws IOException, HttpErrorStatusException { - HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authToken, httpHeaders); + HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authProvider, httpHeaders); try (CloseableHttpResponse response = _httpClient.execute(request)) { StatusLine statusLine = response.getStatusLine(); int statusCode = statusLine.getStatusCode(); @@ -433,19 +426,6 @@ public class HttpClient implements AutoCloseable { requestBuilder.setConfig(requestConfig); } - /** - * Generate an (optional) HTTP Authorization header given an auth token. - * - * @param authToken auth token - * @return list of 0 or 1 "Authorization" headers - */ - public static List<Header> makeAuthHeader(String authToken) { - if (StringUtils.isBlank(authToken)) { - return Collections.emptyList(); - } - return Collections.singletonList(new BasicHeader(AUTH_HTTP_HEADER, authToken)); - } - private static String getErrorMessage(HttpUriRequest request, CloseableHttpResponse response) { String controllerHost = null; String controllerVersion = null; @@ -470,12 +450,10 @@ public class HttpClient implements AutoCloseable { return errorMessage; } - private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken, + private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, AuthProvider authProvider, List<Header> httpHeaders) { RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1); - if (org.apache.commons.lang.StringUtils.isNotBlank(authToken)) { - requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, authToken); - } + AuthProviderUtils.toHeaders(authProvider).forEach(requestBuilder::addHeader); HttpClient.setTimeout(requestBuilder, socketTimeoutMs); String userInfo = uri.getUserInfo(); if (userInfo != null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AuthenticationFilter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AuthenticationFilter.java index 35733dcf78..fb4226777d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AuthenticationFilter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AuthenticationFilter.java @@ -48,7 +48,8 @@ import org.glassfish.grizzly.http.server.Request; @javax.ws.rs.ext.Provider public class AuthenticationFilter implements ContainerRequestFilter { private static final Set<String> UNPROTECTED_PATHS = - new HashSet<>(Arrays.asList("", "help", "auth/info", "auth/verify", "health")); + new HashSet<>(Arrays.asList("", "help", "org/apache/pinot/common/auth/info", + "org/apache/pinot/common/auth/verify", "health")); @Inject Provider<Request> _requestProvider; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java index f35ca49160..bc332e0678 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java @@ -37,6 +37,7 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; @@ -44,6 +45,7 @@ import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.util.FileIngestionHelper; import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; @@ -196,13 +198,16 @@ public class PinotIngestionRestletResource { FileIngestionHelper fileIngestionHelper = new FileIngestionHelper(tableConfig, schema, batchConfigMap, getControllerUri(), - new File(_controllerConf.getDataDir(), UPLOAD_DIR), getAuthToken()); + new File(_controllerConf.getDataDir(), UPLOAD_DIR), getAuthProvider()); return fileIngestionHelper.buildSegmentAndPush(payload); } - private String getAuthToken() { - return _controllerConf + private AuthProvider getAuthProvider() { + String authToken = _controllerConf .getProperty(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY + ".auth.token"); + String authTokenUrl = _controllerConf + .getProperty(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY + ".auth.token.url"); + return AuthProviderUtils.inferProvider(authToken, authTokenUrl); } private URI getControllerUri() { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java index 92da871460..961240edbb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java @@ -35,7 +35,7 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.controller.api.resources.SuccessResponse; import org.apache.pinot.segment.local.utils.IngestionUtils; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; -import org.apache.pinot.spi.auth.AuthContext; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; @@ -71,16 +71,16 @@ public class FileIngestionHelper { private final Map<String, String> _batchConfigMap; private final URI _controllerUri; private final File _uploadDir; - private final AuthContext _authContext; + private final AuthProvider _authProvider; public FileIngestionHelper(TableConfig tableConfig, Schema schema, Map<String, String> batchConfigMap, - URI controllerUri, File uploadDir, String authToken) { + URI controllerUri, File uploadDir, AuthProvider authProvider) { _tableConfig = tableConfig; _schema = schema; _batchConfigMap = batchConfigMap; _controllerUri = controllerUri; _uploadDir = uploadDir; - _authContext = new AuthContext(authToken); + _authProvider = authProvider; } /** @@ -154,7 +154,7 @@ public class FileIngestionHelper { .setIngestionConfig(ingestionConfigOverride).build(); SegmentUploader segmentUploader = PluginManager.get().createInstance(SEGMENT_UPLOADER_CLASS); segmentUploader.init(tableConfigOverride); - segmentUploader.uploadSegment(segmentTarFile.toURI(), _authContext); + segmentUploader.uploadSegment(segmentTarFile.toURI(), _authProvider); LOGGER.info("Uploaded tar: {} to table: {}", segmentTarFile.getAbsolutePath(), tableNameWithType); return new SuccessResponse( diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 79d1741fb6..9c8b301275 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -36,6 +36,7 @@ import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerGauge; @@ -58,6 +59,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; @@ -83,7 +85,7 @@ public abstract class BaseTableDataManager implements TableDataManager { protected File _resourceTmpDir; protected Logger _logger; protected HelixManager _helixManager; - protected String _authToken; + protected AuthProvider _authProvider; // Fixed size LRU cache with TableName - SegmentName pair as key, and segment related // errors as the value. @@ -100,7 +102,8 @@ public abstract class BaseTableDataManager implements TableDataManager { _propertyStore = propertyStore; _serverMetrics = serverMetrics; _helixManager = helixManager; - _authToken = tableDataManagerConfig.getAuthToken(); + _authProvider = AuthProviderUtils.inferProvider(tableDataManagerConfig.getAuthToken(), + tableDataManagerConfig.getAuthTokenUrl()); _tableNameWithType = tableDataManagerConfig.getTableName(); _tableDataDir = tableDataManagerConfig.getDataDir(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index 2e4a10ee2c..30e4d1d92f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -64,7 +64,7 @@ public class SegmentCommitterFactory { segmentUploader = new Server2ControllerSegmentUploader(_logger, _protocolHandler.getFileUploadDownloadClient(), _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), params.getSegmentName(), ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics, - _protocolHandler.getAuthToken()); + _protocolHandler.getAuthProvider()); return new SplitSegmentCommitter(_logger, _protocolHandler, params, segmentUploader); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java index 50c06fe011..023e60399e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java @@ -21,13 +21,14 @@ package org.apache.pinot.core.data.manager.realtime; import java.io.File; import java.net.URI; import java.net.URISyntaxException; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.core.util.SegmentCompletionProtocolUtils; import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.spi.auth.AuthProvider; import org.slf4j.Logger; @@ -40,11 +41,11 @@ public class Server2ControllerSegmentUploader implements SegmentUploader { private final String _segmentName; private final int _segmentUploadRequestTimeoutMs; private final ServerMetrics _serverMetrics; - private final String _authToken; + private final AuthProvider _authProvider; public Server2ControllerSegmentUploader(Logger segmentLogger, FileUploadDownloadClient fileUploadDownloadClient, String controllerSegmentUploadCommitUrl, String segmentName, int segmentUploadRequestTimeoutMs, - ServerMetrics serverMetrics, String authToken) + ServerMetrics serverMetrics, AuthProvider authProvider) throws URISyntaxException { _segmentLogger = segmentLogger; _fileUploadDownloadClient = fileUploadDownloadClient; @@ -52,7 +53,7 @@ public class Server2ControllerSegmentUploader implements SegmentUploader { _segmentName = segmentName; _segmentUploadRequestTimeoutMs = segmentUploadRequestTimeoutMs; _serverMetrics = serverMetrics; - _authToken = authToken; + _authProvider = authProvider; } @Override @@ -73,7 +74,7 @@ public class Server2ControllerSegmentUploader implements SegmentUploader { try { String responseStr = _fileUploadDownloadClient .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, segmentFile, - HttpClient.makeAuthHeader(_authToken), null, _segmentUploadRequestTimeoutMs).getResponse(); + AuthProviderUtils.toHeaders(_authProvider), null, _segmentUploadRequestTimeoutMs).getResponse(); response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); _segmentLogger.info("Controller response {} for {}", response.toJsonString(), _controllerSegmentUploadCommitUrl); if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) { diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java index eede8569c9..cd5275d367 100644 --- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java @@ -23,14 +23,15 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Map; import javax.net.ssl.SSLContext; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.ClientSSLContextGenerator; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.Pair; -import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.core.data.manager.realtime.Server2ControllerSegmentUploader; import org.apache.pinot.core.util.SegmentCompletionProtocolUtils; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -53,7 +54,7 @@ public class ServerSegmentCompletionProtocolHandler { private static SSLContext _sslContext; private static Integer _controllerHttpsPort; private static int _segmentUploadRequestTimeoutMs; - private static String _authToken; + private static AuthProvider _authProvider; private static String _protocol = HTTP_PROTOCOL; private final FileUploadDownloadClient _fileUploadDownloadClient; @@ -73,7 +74,9 @@ public class ServerSegmentCompletionProtocolHandler { _protocol = uploaderConfig.getProperty(CONFIG_OF_PROTOCOL, HTTP_PROTOCOL); _segmentUploadRequestTimeoutMs = uploaderConfig .getProperty(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS); - _authToken = uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOADER_AUTH_TOKEN); + + _authProvider = AuthProviderUtils.inferProvider(uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOADER_AUTH_TOKEN), + uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOADER_AUTH_TOKEN_URL)); } public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) { @@ -90,8 +93,8 @@ public class ServerSegmentCompletionProtocolHandler { return _fileUploadDownloadClient; } - public String getAuthToken() { - return _authToken; + public AuthProvider getAuthProvider() { + return _authProvider; } public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) { @@ -157,7 +160,7 @@ public class ServerSegmentCompletionProtocolHandler { try { segmentUploader = new Server2ControllerSegmentUploader(LOGGER, _fileUploadDownloadClient, url, params.getSegmentName(), - _segmentUploadRequestTimeoutMs, _serverMetrics, _authToken); + _segmentUploadRequestTimeoutMs, _serverMetrics, _authProvider); } catch (URISyntaxException e) { LOGGER.error("Segment commit upload url error: ", e); return SegmentCompletionProtocol.RESP_NOT_SENT; @@ -212,7 +215,7 @@ public class ServerSegmentCompletionProtocolHandler { SegmentCompletionProtocol.Response response; try { String responseStr = _fileUploadDownloadClient - .sendSegmentCompletionProtocolRequest(new URI(url), HttpClient.makeAuthHeader(_authToken), null, + .sendSegmentCompletionProtocolRequest(new URI(url), AuthProviderUtils.toHeaders(_authProvider), null, DEFAULT_OTHER_REQUESTS_TIMEOUT).getResponse(); response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); LOGGER.info("Controller response {} for {}", response.toJsonString(), url); @@ -237,7 +240,7 @@ public class ServerSegmentCompletionProtocolHandler { SegmentCompletionProtocol.Response response; try { String responseStr = _fileUploadDownloadClient - .uploadSegmentMetadataFiles(new URI(url), metadataFiles, HttpClient.makeAuthHeader(_authToken), + .uploadSegmentMetadataFiles(new URI(url), metadataFiles, AuthProviderUtils.toHeaders(_authProvider), null, _segmentUploadRequestTimeoutMs).getResponse(); response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); LOGGER.info("Controller response {} for {}", response.toJsonString(), url); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java index fafa7cfd42..3332f9d257 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java @@ -158,7 +158,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest { IOUtils .write(jobFileContents.replaceAll("9000", String.valueOf(getControllerPort())), new FileOutputStream(jobFile)); - new BootstrapTableTool("http", "localhost", getControllerPort(), baseDir.getAbsolutePath(), AUTH_TOKEN).execute(); + new BootstrapTableTool("http", "localhost", getControllerPort(), baseDir.getAbsolutePath(), + AUTH_TOKEN, null).execute(); Thread.sleep(5000); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java index 3ea7c7d28f..0ce4b7177c 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java @@ -33,16 +33,18 @@ import org.apache.http.Header; import org.apache.http.NameValuePair; import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.auth.StaticTokenAuthProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; -import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.minion.MinionConf; import org.apache.pinot.minion.exception.TaskCancelledException; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +107,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe .collect(Collectors.toList()); String lineageEntryId = SegmentConversionUtils.startSegmentReplace(context.getTableNameWithType(), context.getUploadURL(), - new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), context.getAuthToken()); + new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), context.getAuthProvider()); context.setCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID, lineageEntryId); } } @@ -116,7 +118,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe if (context.isReplaceSegmentsEnabled()) { String lineageEntryId = (String) context.getCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID); SegmentConversionUtils.endSegmentReplace(context.getTableNameWithType(), context.getUploadURL(), lineageEntryId, - _minionConf.getEndReplaceSegmentsTimeoutMs(), context.getAuthToken()); + _minionConf.getEndReplaceSegmentsTimeoutMs(), context.getAuthProvider()); } } @@ -132,7 +134,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY); String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR); String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY); - String authToken = configs.get(MinionConstants.AUTH_TOKEN); + AuthProvider authProvider = AuthProviderUtils.inferProvider(configs.get(MinionConstants.AUTH_TOKEN)); LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType, tableNameWithType, inputSegmentNames, downloadURLString, uploadURL); @@ -215,7 +217,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe List<Header> httpHeaders = new ArrayList<>(); httpHeaders.add(segmentZKMetadataCustomMapModifierHeader); - httpHeaders.addAll(HttpClient.makeAuthHeader(authToken)); + httpHeaders.addAll(AuthProviderUtils.toHeaders(authProvider.getHttpHeaders())); // Set parameters for upload request NameValuePair enableParallelPushProtectionParameter = @@ -255,7 +257,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe private final String _tableNameWithType; private final String _uploadURL; - private final String _authToken; + private final AuthProvider _authProvider; private final String _inputSegmentNames; private final boolean _replaceSegmentsEnabled; private final Map<String, Object> _customMap; @@ -268,7 +270,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe Map<String, String> configs = pinotTaskConfig.getConfigs(); _tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); _uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY); - _authToken = configs.get(MinionConstants.AUTH_TOKEN); + _authProvider = new StaticTokenAuthProvider(configs.get(MinionConstants.AUTH_TOKEN)); _inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY); String replaceSegmentsString = configs.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY); _replaceSegmentsEnabled = Boolean.parseBoolean(replaceSegmentsString); @@ -291,8 +293,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe return _uploadURL; } - public String getAuthToken() { - return _authToken; + public AuthProvider getAuthProvider() { + return _authProvider; } public String getInputSegmentNames() { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java index 730f5f1d05..e8eb4e1ba3 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java @@ -31,14 +31,15 @@ import org.apache.http.HttpHeaders; import org.apache.http.NameValuePair; import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; -import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.minion.exception.TaskCancelledException; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +70,7 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut String downloadURL = configs.get(MinionConstants.DOWNLOAD_URL_KEY); String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY); String originalSegmentCrc = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY); - String authToken = configs.get(MinionConstants.AUTH_TOKEN); + AuthProvider authProvider = AuthProviderUtils.inferProvider(configs.get(MinionConstants.AUTH_TOKEN)); long currentSegmentCrc = getSegmentCrc(tableNameWithType, segmentName); if (Long.parseLong(originalSegmentCrc) != currentSegmentCrc) { @@ -150,7 +151,7 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut httpHeaders.add(ifMatchHeader); httpHeaders.add(refreshOnlyHeader); httpHeaders.add(segmentZKMetadataCustomMapModifierHeader); - httpHeaders.addAll(HttpClient.makeAuthHeader(authToken)); + httpHeaders.addAll(AuthProviderUtils.toHeaders(authProvider)); // Set parameters for upload request. NameValuePair enableParallelPushProtectionParameter = diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java index cf07398325..d6b1cd6454 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java @@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.minion.MinionContext; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -124,13 +125,14 @@ public class SegmentConversionUtils { } public static String startSegmentReplace(String tableNameWithType, String uploadURL, - StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken) + StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable AuthProvider authProvider) throws Exception { - return startSegmentReplace(tableNameWithType, uploadURL, startReplaceSegmentsRequest, authToken, true); + return startSegmentReplace(tableNameWithType, uploadURL, startReplaceSegmentsRequest, authProvider, true); } public static String startSegmentReplace(String tableNameWithType, String uploadURL, - StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken, boolean forceCleanup) + StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable AuthProvider authProvider, + boolean forceCleanup) throws Exception { String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); @@ -139,17 +141,18 @@ public class SegmentConversionUtils { URI uri = FileUploadDownloadClient .getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), forceCleanup); SimpleHttpResponse response = - fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest, authToken); + fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest, authProvider); String responseString = response.getResponse(); LOGGER.info( - "Got response {}: {} while sending start replace segment request for table: {}, uploadURL: {}, request: {}", + "Got response {}: {} while sending start replace segment reBaseSingleSegmentConversionExecutorquest for " + + "table: {}, uploadURL: {}, request: {}", response.getStatusCode(), responseString, tableNameWithType, uploadURL, startReplaceSegmentsRequest); return JsonUtils.stringToJsonNode(responseString).get("segmentLineageEntryId").asText(); } } public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId, - int socketTimeoutMs, @Nullable String authToken) + int socketTimeoutMs, @Nullable AuthProvider authProvider) throws Exception { String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); @@ -157,7 +160,7 @@ public class SegmentConversionUtils { try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) { URI uri = FileUploadDownloadClient .getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId); - SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authToken); + SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authProvider); LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURL: {}", response.getStatusCode(), response.getResponse(), tableNameWithType, uploadURL); } diff --git a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java index c836e49924..81f538c7f7 100644 --- a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java +++ b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java @@ -29,7 +29,7 @@ import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.segment.local.utils.IngestionUtils; -import org.apache.pinot.spi.auth.AuthContext; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.ingestion.batch.BatchConfig; @@ -86,15 +86,15 @@ public class SegmentUploaderDefault implements SegmentUploader { } @Override - public void uploadSegment(URI segmentTarURI, @Nullable AuthContext authContext) + public void uploadSegment(URI segmentTarURI, @Nullable AuthProvider authProvider) throws Exception { IngestionUtils - .uploadSegment(_tableNameWithType, _batchConfig, Collections.singletonList(segmentTarURI), authContext); + .uploadSegment(_tableNameWithType, _batchConfig, Collections.singletonList(segmentTarURI), authProvider); LOGGER.info("Successfully uploaded segment: {} to table: {}", segmentTarURI, _tableNameWithType); } @Override - public void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext authContext) + public void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthProvider authProvider) throws Exception { List<URI> segmentTarURIs = new ArrayList<>(); @@ -106,7 +106,7 @@ public class SegmentUploaderDefault implements SegmentUploader { segmentTarURIs.add(uri); } } - IngestionUtils.uploadSegment(_tableNameWithType, _batchConfig, segmentTarURIs, authContext); + IngestionUtils.uploadSegment(_tableNameWithType, _batchConfig, segmentTarURIs, authProvider); LOGGER.info("Successfully uploaded segments: {} to table: {}", segmentTarURIs, _tableNameWithType); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java index f393338323..e085a72abf 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java @@ -37,6 +37,7 @@ public class TableDataManagerConfig { private static final String TABLE_DATA_MANAGER_NAME = "name"; private static final String TABLE_IS_DIMENSION = "isDimTable"; private static final String TABLE_DATA_MANGER_AUTH_TOKEN = "authToken"; + private static final String TABLE_DATA_MANGER_AUTH_TOKEN_URL = "authTokenUrl"; private final Configuration _tableDataManagerConfig; @@ -72,6 +73,10 @@ public class TableDataManagerConfig { return _tableDataManagerConfig.getString(TABLE_DATA_MANGER_AUTH_TOKEN); } + public String getAuthTokenUrl() { + return _tableDataManagerConfig.getString(TABLE_DATA_MANGER_AUTH_TOKEN_URL); + } + public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig( InstanceDataManagerConfig instanceDataManagerConfig, String tableNameWithType) { Configuration defaultConfig = new PropertiesConfiguration(); @@ -83,6 +88,7 @@ public class TableDataManagerConfig { Preconditions.checkNotNull(tableType); defaultConfig.addProperty(TABLE_DATA_MANAGER_TYPE, tableType.name()); defaultConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN, instanceDataManagerConfig.getAuthToken()); + defaultConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN_URL, instanceDataManagerConfig.getAuthTokenUrl()); return new TableDataManagerConfig(defaultConfig); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index 0e38807cc5..a67c0e6151 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; @@ -38,7 +39,7 @@ import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator; -import org.apache.pinot.spi.auth.AuthContext; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; @@ -188,13 +189,14 @@ public final class IngestionUtils { * @param tableNameWithType name of the table to upload the segment * @param batchConfig batchConfig with details about push such as controllerURI, pushAttempts, pushParallelism, etc * @param segmentTarURIs list of URI for the segment tar files - * @param authContext auth details required to upload the Pinot segment to controller + * @param authProvider auth provider */ public static void uploadSegment(String tableNameWithType, BatchConfig batchConfig, List<URI> segmentTarURIs, - @Nullable AuthContext authContext) + @Nullable AuthProvider authProvider) throws Exception { - SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext); + SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, + authProvider); List<String> segmentTarURIStrs = segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList()); String pushMode = batchConfig.getPushMode(); @@ -249,7 +251,7 @@ public final class IngestionUtils { } private static SegmentGenerationJobSpec generateSegmentUploadSpec(String tableName, BatchConfig batchConfig, - @Nullable AuthContext authContext) { + @Nullable AuthProvider authProvider) { TableSpec tableSpec = new TableSpec(); tableSpec.setTableName(tableName); @@ -269,8 +271,8 @@ public final class IngestionUtils { spec.setPushJobSpec(pushJobSpec); spec.setTableSpec(tableSpec); spec.setPinotClusterSpecs(pinotClusterSpecs); - if (authContext != null && StringUtils.isNotBlank(authContext.getAuthToken())) { - spec.setAuthToken(authContext.getAuthToken()); + if (authProvider != null) { + spec.setAuthToken(AuthProviderUtils.resolveToToken(authProvider)); } return spec; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java index 6a8a349593..68067f34f1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java @@ -37,12 +37,14 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; import org.apache.http.message.BasicHeader; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; @@ -105,6 +107,7 @@ public class SegmentPushUtils implements Serializable { String fileName = tarFile.getName(); Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()); + AuthProvider authProvider = AuthProviderUtils.inferProvider(spec.getAuthToken()); for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { URI controllerURI; try { @@ -125,7 +128,7 @@ public class SegmentPushUtils implements Serializable { try (InputStream inputStream = fileSystem.open(tarFileURI)) { SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), - segmentName, inputStream, HttpClient.makeAuthHeader(spec.getAuthToken()), + segmentName, inputStream, AuthProviderUtils.toHeaders(authProvider), FileUploadDownloadClient.makeTableParam(tableName), tableName, tableType); LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName, controllerURI, response.getStatusCode(), response.getResponse()); @@ -162,6 +165,7 @@ public class SegmentPushUtils implements Serializable { for (String segmentUri : segmentUris) { URI segmentURI = URI.create(segmentUri); PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme()); + AuthProvider authProvider = AuthProviderUtils.inferProvider(spec.getAuthToken()); for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { URI controllerURI; try { @@ -182,8 +186,7 @@ public class SegmentPushUtils implements Serializable { try { SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentUri, - HttpClient.makeAuthHeader(spec.getAuthToken()), - FileUploadDownloadClient.makeTableParam(tableName), + AuthProviderUtils.toHeaders(authProvider), FileUploadDownloadClient.makeTableParam(tableName), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); LOGGER.info("Response for pushing table {} segment uri {} to location {} - {}: {}", tableName, segmentUri, controllerURI, response.getStatusCode(), response.getResponse()); @@ -237,6 +240,7 @@ public class SegmentPushUtils implements Serializable { Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()); File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); + AuthProvider authProvider = AuthProviderUtils.inferProvider(spec.getAuthToken()); try { for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { URI controllerURI; @@ -260,7 +264,7 @@ public class SegmentPushUtils implements Serializable { headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath)); headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString())); - headers.addAll(HttpClient.makeAuthHeader(spec.getAuthToken())); + headers.addAll(AuthProviderUtils.toHeaders(authProvider)); SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT .uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index 809a86642d..14b193e4dd 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -64,6 +64,8 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT = "reload.consumingSegment"; // Key of the auth token public static final String AUTH_TOKEN = "auth.token"; + // Key of the auth token url + public static final String AUTH_TOKEN_URL = "auth.token.url"; // Key of segment directory loader public static final String SEGMENT_DIRECTORY_LOADER = "segment.directory.loader"; @@ -216,6 +218,11 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN); } + @Override + public String getAuthTokenUrl() { + return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN_URL); + } + @Override public String getSegmentDirectoryLoader() { return _instanceDataManagerConfiguration.getProperty(SEGMENT_DIRECTORY_LOADER, diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthProvider.java similarity index 63% rename from pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthProvider.java index 5a9798c355..1ecbcc1188 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthProvider.java @@ -18,17 +18,16 @@ */ package org.apache.pinot.spi.auth; -/** - * Container for all auth related info - */ -public class AuthContext { - private final String _authToken; +import java.util.Map; - public AuthContext(String authToken) { - _authToken = authToken; - } - public String getAuthToken() { - return _authToken; - } +/** + * Pluggable auth provider interface to augment authentication information in requests issued by pinot. + * + * Comes with several default implementation, including noop, static tokens, and token loaded from external urls. + * The purpose of AuthProvider is enabling dynamic reconfiguration of pinot's internal auth tokens, for example with + * expiring JWTs and other token rotation mechanisms. + */ +public interface AuthProvider { + Map<String, Object> getHttpHeaders(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java index 7718930657..422add0b37 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java @@ -55,6 +55,8 @@ public interface InstanceDataManagerConfig { String getAuthToken(); + String getAuthTokenUrl(); + String getSegmentDirectoryLoader(); long getErrorCacheSize(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java index 15fc9fbcd8..145f443a4a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java @@ -124,6 +124,12 @@ public class SegmentGenerationJobSpec implements Serializable { /** * Controller auth token + * + * <br/>NOTE: jobs MUST NOT allow references to external tokens via URL or path to prevent: + * (a) file system crawling + * (b) unauthorized injection of system tokens from the server's local file system. + * + * Instead, resolve tokens right when the job command is run. This allows injection of client-local credentials. */ private String _authToken; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java index e723b4c987..e48cd63354 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java @@ -22,7 +22,7 @@ import java.net.URI; import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceStability; -import org.apache.pinot.spi.auth.AuthContext; +import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableConfig; @@ -49,17 +49,17 @@ public interface SegmentUploader { /** * Uploads the segment tar file to the cluster * @param segmentTarFile URI of segment tar file - * @param authContext auth details required to upload pinot segment to controller + * @param authProvider auth provider */ - void uploadSegment(URI segmentTarFile, @Nullable AuthContext authContext) + void uploadSegment(URI segmentTarFile, @Nullable AuthProvider authProvider) throws Exception; /** * Uploads the segments from the segmentDir to the cluster. * Looks for segmentTar files recursively, with suffix .tar.gz * @param segmentDir URI of directory containing segment tar files - * @param authContext auth details required to upload pinot segment to controller + * @param authProvider auth auth provider */ - void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext authContext) + void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthProvider authProvider) throws Exception; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 15fdc312a5..37fc66fab4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -34,6 +34,7 @@ public class CommonConstants { public static final String HTTPS_PROTOCOL = "https"; public static final String KEY_OF_AUTH_TOKEN = "auth.token"; + public static final String KEY_OF_AUTH_TOKEN_URL = "auth.token.url"; public static final String TABLE_NAME = "tableName"; @@ -433,6 +434,7 @@ public class CommonConstants { * E.g. null (auth disabled), "Basic abcdef..." (basic auth), "Bearer 123def..." (oauth2) */ public static final String CONFIG_OF_SEGMENT_UPLOADER_AUTH_TOKEN = KEY_OF_AUTH_TOKEN; + public static final String CONFIG_OF_SEGMENT_UPLOADER_AUTH_TOKEN_URL = KEY_OF_AUTH_TOKEN_URL; public static final int DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = 300_000; public static final int DEFAULT_OTHER_REQUESTS_TIMEOUT = 10_000; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java index ebcdb377d2..5e0a4ac218 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java @@ -30,6 +30,7 @@ import java.net.URL; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.minion.MinionClient; import org.apache.pinot.common.utils.TlsUtils; import org.apache.pinot.core.common.MinionConstants; @@ -54,11 +55,12 @@ public class BootstrapTableTool { private final String _controllerHost; private final int _controllerPort; private final String _authToken; + private final String _authTokenUrl; private final String _tableDir; private final MinionClient _minionClient; public BootstrapTableTool(String controllerProtocol, String controllerHost, int controllerPort, String tableDir, - String authToken) { + String authToken, String authTokenUrl) { Preconditions.checkNotNull(controllerProtocol); Preconditions.checkNotNull(controllerHost); Preconditions.checkNotNull(tableDir); @@ -68,6 +70,7 @@ public class BootstrapTableTool { _tableDir = tableDir; _minionClient = new MinionClient(controllerHost, String.valueOf(controllerPort)); _authToken = authToken; + _authTokenUrl = authTokenUrl; } public boolean execute() @@ -117,7 +120,7 @@ public class BootstrapTableTool { return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath()) .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerProtocol(_controllerProtocol) .setControllerHost(_controllerHost).setControllerPort(String.valueOf(_controllerPort)).setExecute(true) - .setAuthToken(_authToken).execute(); + .setAuthToken(_authToken).setAuthTokenUrl(_authTokenUrl).execute(); } private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile, @@ -179,7 +182,8 @@ public class BootstrapTableTool { tlsSpec.getTrustStorePassword()); } - spec.setAuthToken(_authToken); + // url-based token needs to be resolved before job run + spec.setAuthToken(AuthProviderUtils.resolveToToken(_authToken, _authTokenUrl)); IngestionJobLauncher.runIngestionJob(spec); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java index e3b1000ad7..9b463ca652 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java @@ -35,7 +35,7 @@ import javax.annotation.Nullable; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; -import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.core.auth.BasicAuthUtils; import org.apache.pinot.tools.AbstractBaseCommand; import org.apache.pinot.tools.utils.PinotConfigUtils; @@ -126,13 +126,12 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand { /** * Generate an (optional) HTTP Authorization header given an auth token - * @see HttpClient#makeAuthHeader(String) * * @param authToken auth token * @return list of 0 or 1 "Authorization" headers */ - static List<Header> makeAuthHeader(String authToken) { - return HttpClient.makeAuthHeader(authToken); + static List<Header> makeAuthHeaders(String authToken, String authTokenUrl) { + return AuthProviderUtils.toHeaders(AuthProviderUtils.inferProvider(authToken, authTokenUrl)); } /** diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java index 83fe31eea5..cac2c295e1 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java @@ -59,6 +59,9 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print this message.") private boolean _help = false; @@ -151,7 +154,8 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { fileUploadDownloadClient.addSchema(FileUploadDownloadClient .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)), - schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)), + schema.getSchemaName(), schemaFile, makeAuthHeaders(makeAuthToken(_authToken, _user, _password), + _authTokenUrl), Collections.emptyList()); } catch (Exception e) { LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java index ea423d972d..871a4de15b 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java @@ -72,6 +72,9 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print this message.") private boolean _help = false; @@ -146,6 +149,11 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command return this; } + public AddTableCommand setAuthTokenUrl(String authTokenUrl) { + _authTokenUrl = _authTokenUrl; + return this; + } + public AddTableCommand setExecute(boolean exec) { _exec = exec; return this; @@ -165,7 +173,8 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { fileUploadDownloadClient.addSchema(FileUploadDownloadClient .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)), - schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)), + schema.getSchemaName(), schemaFile, makeAuthHeaders(makeAuthToken(_authToken, _user, _password), + _authTokenUrl), Collections.emptyList()); } catch (Exception e) { LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e); @@ -177,7 +186,7 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command throws IOException { String res = AbstractBaseAdminCommand .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString(), - makeAuthHeader(makeAuthToken(_authToken, _user, _password))); + makeAuthHeaders(makeAuthToken(_authToken, _user, _password), _authTokenUrl)); LOGGER.info(res); return res.contains("succesfully added"); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java index 7001d38ca5..3cb5bd7cbb 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java @@ -71,6 +71,9 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print this message.") private boolean _help = false; @@ -147,7 +150,7 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman Tenant tenant = new Tenant(_role, _name, _instanceCount, _offlineInstanceCount, _realtimeInstanceCount); String res = AbstractBaseAdminCommand .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(), - tenant.toJsonString(), makeAuthHeader(makeAuthToken(_authToken, _user, _password))); + tenant.toJsonString(), makeAuthHeaders(makeAuthToken(_authToken, _user, _password), _authTokenUrl)); LOGGER.info(res); System.out.print(res); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java index a6accd2b14..72057df286 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java @@ -87,6 +87,9 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print this message.") private boolean _help = false; @@ -128,7 +131,8 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C _controllerHost = NetUtils.getHostAddress(); } String token = makeAuthToken(_authToken, _user, _password); - return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir, token) + return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir, token, + _authTokenUrl) .execute(); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java index d098935b3a..381b3b2e1e 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java @@ -21,7 +21,6 @@ package org.apache.pinot.tools.admin.command; import java.net.URI; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.methods.GetMethod; -import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.NetUtils; import org.apache.pinot.tools.Command; @@ -58,6 +57,9 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print this message.") private boolean _help = false; @@ -81,9 +83,9 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman String token = makeAuthToken(_authToken, _user, _password); GetMethod httpGet = new GetMethod(uri.toString()); - if (StringUtils.isNotBlank(token)) { - httpGet.setRequestHeader("Authorization", token); - } + makeAuthHeaders(makeAuthToken(_authToken, _user, _password), _authTokenUrl) + .forEach(header -> httpGet.addRequestHeader(header.getName(), header.getValue())); + int status = httpClient.executeMethod(httpGet); if (status != 200) { throw new RuntimeException("Failed to change table state, error: " + httpGet.getResponseBodyAsString()); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java index 42161a1a9d..0e135d8e57 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; @@ -85,6 +86,9 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-tempDir"}, description = "Temporary directory used to hold data during segment creation.") private String _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()).getAbsolutePath(); @@ -148,6 +152,11 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma return this; } + public ImportDataCommand setAuthTokenUrl(String authTokenUrl) { + _authTokenUrl = authTokenUrl; + return this; + } + public List<String> getAdditionalConfigs() { return _additionalConfigs; } @@ -253,7 +262,7 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma spec.setCleanUpOutputDir(true); spec.setOverwriteOutput(true); spec.setJobType("SegmentCreationAndTarPush"); - spec.setAuthToken(makeAuthToken(_authToken, _user, _password)); + spec.setAuthToken(AuthProviderUtils.resolveToToken(makeAuthToken(_authToken, _user, _password), _authTokenUrl)); // set ExecutionFrameworkSpec ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java index 48c58a3949..c1a8b0f8df 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java @@ -21,6 +21,7 @@ package org.apache.pinot.tools.admin.command; import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.utils.TlsUtils; import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; @@ -58,6 +59,8 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl private String _password; @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; public String getJobSpecFile() { return _jobSpecFile; @@ -114,7 +117,7 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl } if (StringUtils.isBlank(spec.getAuthToken())) { - spec.setAuthToken(makeAuthToken(_authToken, _user, _password)); + spec.setAuthToken(AuthProviderUtils.resolveToToken(makeAuthToken(_authToken, _user, _password), _authTokenUrl)); } try { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java index 35fe6dbd27..b8647ddf64 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java @@ -55,6 +55,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-config"}, description = "Cluster config to operate.") private String _config; @@ -148,7 +151,7 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem } String clusterConfigUrl = _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs"; - List<Header> headers = makeAuthHeader(makeAuthToken(_authToken, _user, _password)); + List<Header> headers = makeAuthHeaders(makeAuthToken(_authToken, _user, _password), _authTokenUrl); switch (_operation.toUpperCase()) { case "ADD": case "UPDATE": diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java index 60c3c02d41..e8f6194360 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java @@ -54,6 +54,9 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print " + "this message.") private boolean _help = false; @@ -113,6 +116,11 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman return this; } + public PostQueryCommand setAuthTokenUrl(String authTokenUrl) { + _authTokenUrl = authTokenUrl; + return this; + } + public PostQueryCommand setQuery(String query) { _query = query; return this; @@ -126,7 +134,8 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman LOGGER.info("Executing command: " + this); String url = _brokerProtocol + "://" + _brokerHost + ":" + _brokerPort + "/query/sql"; String request = JsonUtils.objectToString(Collections.singletonMap(Request.SQL, _query)); - return sendRequest("POST", url, request, makeAuthHeader(makeAuthToken(_authToken, _user, _password))); + return sendRequest("POST", url, request, makeAuthHeaders(makeAuthToken(_authToken, _user, _password), + _authTokenUrl)); } @Override diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java index faa005d936..3f241666c6 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java @@ -73,6 +73,7 @@ public class QuickstartRunner { private final File _tempDir; private final boolean _enableTenantIsolation; private final String _authToken; + private final String _authTokenUrl; private final Map<String, Object> _configOverrides; private final boolean _deleteExistingData; @@ -86,7 +87,8 @@ public class QuickstartRunner { public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numControllers, int numBrokers, int numServers, int numMinions, File tempDir, Map<String, Object> configOverrides) throws Exception { - this(tableRequests, numControllers, numBrokers, numServers, numMinions, tempDir, true, null, configOverrides, null, + this(tableRequests, numControllers, numBrokers, numServers, numMinions, tempDir, true, null, + null, configOverrides, null, true); } @@ -94,6 +96,14 @@ public class QuickstartRunner { int numServers, int numMinions, File tempDir, boolean enableIsolation, String authToken, Map<String, Object> configOverrides, String zkExternalAddress, boolean deleteExistingData) throws Exception { + this(tableRequests, numControllers, numBrokers, numServers, numMinions, tempDir, enableIsolation, authToken, null, + configOverrides, zkExternalAddress, deleteExistingData); + } + + public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numControllers, int numBrokers, + int numServers, int numMinions, File tempDir, boolean enableIsolation, String authToken, String authTokenUrl, + Map<String, Object> configOverrides, String zkExternalAddress, boolean deleteExistingData) + throws Exception { _tableRequests = tableRequests; _numControllers = numControllers; _numBrokers = numBrokers; @@ -102,6 +112,7 @@ public class QuickstartRunner { _tempDir = tempDir; _enableTenantIsolation = enableIsolation; _authToken = authToken; + _authTokenUrl = authTokenUrl; _configOverrides = configOverrides; _zkExternalAddress = zkExternalAddress; _deleteExistingData = deleteExistingData; @@ -228,7 +239,7 @@ public class QuickstartRunner { throws Exception { for (QuickstartTableRequest request : _tableRequests) { if (!new BootstrapTableTool("http", "localhost", _controllerPorts.get(0), request.getBootstrapTableDir(), - _authToken).execute()) { + _authToken, _authTokenUrl).execute()) { throw new RuntimeException("Failed to bootstrap table with request - " + request); } } @@ -273,8 +284,8 @@ public class QuickstartRunner { throws Exception { int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size())); return JsonUtils.stringToJsonNode( - new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setAuthToken(_authToken).setQuery(query) - .run()); + new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setAuthToken(_authToken) + .setAuthTokenUrl(_authTokenUrl).setQuery(query).run()); } public static void registerDefaultPinotFS() { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java index 90957dcc91..64610150db 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java @@ -24,6 +24,7 @@ import java.net.URI; import java.util.Collections; import org.apache.commons.io.FileUtils; import org.apache.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.http.HttpClient; @@ -63,6 +64,9 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-segmentDir"}, required = true, description = "Path to segment directory.") private String _segmentDir = null; @@ -129,6 +133,11 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co return this; } + public UploadSegmentCommand setAuthTokenUrl(String authTokenUrl) { + _authTokenUrl = authTokenUrl; + return this; + } + public UploadSegmentCommand setSegmentDir(String segmentDir) { _segmentDir = segmentDir; return this; @@ -168,8 +177,9 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co LOGGER.info("Uploading segment tar file: {}", segmentTarFile); fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, - makeAuthHeader(makeAuthToken(_authToken, _user, _password)), Collections - .singletonList(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, _tableName)), + AuthProviderUtils.toHeaders(AuthProviderUtils.inferProvider(makeAuthToken(_authToken, _user, _password), + _authTokenUrl)), Collections.singletonList(new BasicNameValuePair( + FileUploadDownloadClient.QueryParameters.TABLE_NAME, _tableName)), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); } } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
