This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2c3813bb97 Add pluggable client auth provider (#8670)
2c3813bb97 is described below
commit 2c3813bb9722d0ec6053bafbbdd07d7d2bd32cc3
Author: Alexander Pucher <[email protected]>
AuthorDate: Tue May 17 12:07:24 2022 -0700
Add pluggable client auth provider (#8670)
add support for pluggable client auth providers, which enables pinot
components (controller, server, minion) to use dynamically changing tokens,
such as kubernetes service account JWTs
---
.../org/apache/pinot/common/auth/AuthConfig.java | 23 ++-
.../pinot/common/auth/AuthProviderUtils.java | 171 +++++++++++++++++++++
.../apache/pinot/common/auth/NullAuthProvider.java | 29 +++-
.../pinot/common/auth/StaticTokenAuthProvider.java | 68 ++++++++
.../apache/pinot/common/auth/UrlAuthProvider.java | 87 +++++++++++
.../common/utils/FileUploadDownloadClient.java | 45 +++---
.../common/utils/fetcher/BaseSegmentFetcher.java | 7 +-
.../common/utils/fetcher/HttpSegmentFetcher.java | 4 +-
.../utils/fetcher/SegmentFetcherFactory.java | 15 +-
.../apache/pinot/common/utils/http/HttpClient.java | 70 +++------
.../resources/PinotIngestionRestletResource.java | 12 +-
.../pinot/controller/util/FileIngestionHelper.java | 10 +-
.../core/data/manager/BaseTableDataManager.java | 18 ++-
.../manager/realtime/SegmentCommitterFactory.java | 2 +-
.../realtime/Server2ControllerSegmentUploader.java | 11 +-
.../ServerSegmentCompletionProtocolHandler.java | 18 ++-
.../BaseTableDataManagerAcquireSegmentTest.java | 3 +
.../data/manager/BaseTableDataManagerTest.java | 3 +
.../tests/BasicAuthBatchIntegrationTest.java | 4 +-
.../integration/tests/TlsIntegrationTest.java | 7 +
...st.java => UrlAuthRealtimeIntegrationTest.java} | 55 +++++--
.../src/test/resources/url-auth-token-prefixed.txt | 1 +
.../src/test/resources/url-auth-token.txt | 1 +
.../BaseMultipleSegmentsConversionExecutor.java | 19 +--
.../tasks/BaseSingleSegmentConversionExecutor.java | 7 +-
.../minion/tasks/SegmentConversionUtils.java | 17 +-
.../segmentuploader/SegmentUploaderDefault.java | 10 +-
.../local/data/manager/TableDataManagerConfig.java | 14 +-
.../pinot/segment/local/utils/IngestionUtils.java | 17 +-
.../segment/local/utils/SegmentPushUtils.java | 14 +-
.../starter/helix/HelixInstanceDataManager.java | 5 +-
.../helix/HelixInstanceDataManagerConfig.java | 8 -
.../auth/{AuthContext.java => AuthProvider.java} | 21 +--
.../config/instance/InstanceDataManagerConfig.java | 2 -
.../batch/spec/SegmentGenerationJobSpec.java | 6 +
.../segment/uploader/SegmentUploader.java | 10 +-
.../apache/pinot/spi/utils/CommonConstants.java | 6 +-
.../org/apache/pinot/tools/AuthQuickstart.java | 6 +-
.../org/apache/pinot/tools/BootstrapTableTool.java | 13 +-
.../org/apache/pinot/tools/EmptyQuickstart.java | 5 +-
.../java/org/apache/pinot/tools/Quickstart.java | 5 +-
.../admin/command/AbstractBaseAdminCommand.java | 38 +++--
.../tools/admin/command/AddSchemaCommand.java | 14 +-
.../pinot/tools/admin/command/AddTableCommand.java | 20 ++-
.../tools/admin/command/AddTenantCommand.java | 17 +-
.../tools/admin/command/BootstrapTableCommand.java | 16 +-
.../tools/admin/command/ChangeTableState.java | 20 ++-
.../tools/admin/command/ImportDataCommand.java | 12 +-
.../command/LaunchDataIngestionJobCommand.java | 11 +-
.../admin/command/OperateClusterConfigCommand.java | 19 ++-
.../tools/admin/command/PostQueryCommand.java | 17 +-
.../tools/admin/command/QuickstartRunner.java | 15 +-
.../tools/admin/command/UploadSegmentCommand.java | 20 ++-
53 files changed, 784 insertions(+), 284 deletions(-)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthConfig.java
similarity index 60%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
copy to pinot-common/src/main/java/org/apache/pinot/common/auth/AuthConfig.java
index 5a9798c355..60e117bd98 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthConfig.java
@@ -16,19 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.auth;
+package org.apache.pinot.common.auth;
+
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
/**
- * Container for all auth related info
+ * Standardized auth config container for AuthProvider
+ * @see AuthProviderUtils#extractAuthConfig(PinotConfiguration, String)
*/
-public class AuthContext {
- private final String _authToken;
+public class AuthConfig {
+ public static final String PROVIDER_CLASS = "provider.class";
+
+ protected Map<String, Object> _properties;
- public AuthContext(String authToken) {
- _authToken = authToken;
+ public AuthConfig(Map<String, Object> properties) {
+ _properties = properties;
}
- public String getAuthToken() {
- return _authToken;
+ public Map<String, Object> getProperties() {
+ return _properties;
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
new file mode 100644
index 0000000000..48b4fc94bb
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.auth;
+
+import java.lang.reflect.Constructor;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+/**
+ * Utility class to wrap inference of optimal auth provider from component
configs.
+ */
+public final class AuthProviderUtils {
+ private AuthProviderUtils() {
+ // left blank
+ }
+
+ /**
+ * Extract an AuthConfig from a pinot configuration subset namespace.
+ *
+ * @param pinotConfig pinot configuration
+ * @param namespace subset namespace
+ * @return auth config
+ */
+ public static AuthConfig extractAuthConfig(PinotConfiguration pinotConfig,
String namespace) {
+ if (namespace == null) {
+ return new AuthConfig(pinotConfig.toMap());
+ }
+ return new AuthConfig(pinotConfig.subset(namespace).toMap());
+ }
+
+ /**
+ * Create an AuthProvider after extracting a config from a pinot
configuration subset namespace
+ * @see AuthProviderUtils#extractAuthConfig(PinotConfiguration, String)
+ *
+ * @param pinotConfig pinot configuration
+ * @param namespace subset namespace
+ * @return auth provider
+ */
+ public static AuthProvider extractAuthProvider(PinotConfiguration
pinotConfig, String namespace) {
+ return makeAuthProvider(extractAuthConfig(pinotConfig, namespace));
+ }
+
+ /**
+ * Create auth provider based on the availability of a static token only, if
any. This typically applies to task specs
+ *
+ * @param authToken static auth token
+ * @return auth provider
+ */
+ public static AuthProvider makeAuthProvider(String authToken) {
+ if (StringUtils.isBlank(authToken)) {
+ return new NullAuthProvider();
+ }
+ return new StaticTokenAuthProvider(authToken);
+ }
+
+ /**
+ * Create auth provider based on an auth config. Mimics legacy behavior for
static tokens if provided, or dynamic auth
+ * providers if additional configs are given.
+ *
+ * @param authConfig auth config
+ * @return auth provider
+ */
+ public static AuthProvider makeAuthProvider(AuthConfig authConfig) {
+ if (authConfig == null) {
+ return new NullAuthProvider();
+ }
+
+ Object providerClassValue =
authConfig.getProperties().get(AuthConfig.PROVIDER_CLASS);
+ if (providerClassValue != null) {
+ try {
+ Class<?> providerClass = Class.forName(providerClassValue.toString());
+ Constructor<?> constructor =
providerClass.getConstructor(AuthConfig.class);
+ return (AuthProvider) constructor.newInstance(authConfig);
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not create AuthProvider " +
providerClassValue, e);
+ }
+ }
+
+ // mimic legacy behavior for "auth.token" property
+ if (authConfig.getProperties().containsKey(StaticTokenAuthProvider.TOKEN))
{
+ return new StaticTokenAuthProvider(authConfig);
+ }
+
+ if (!authConfig.getProperties().isEmpty()) {
+ throw new IllegalArgumentException("Some auth properties defined, but no
provider created. Aborting.");
+ }
+
+ return new NullAuthProvider();
+ }
+
+ /**
+ * Convenience helper to convert Map to list of Http Headers
+ * @param headers header map
+ * @return list of http headers
+ */
+ public static List<Header> toRequestHeaders(@Nullable Map<String, Object>
headers) {
+ if (headers == null) {
+ return Collections.emptyList();
+ }
+ return headers.entrySet().stream().filter(entry ->
Objects.nonNull(entry.getValue()))
+ .map(entry -> new BasicHeader(entry.getKey(),
entry.getValue().toString())).collect(Collectors.toList());
+ }
+
+ /**
+ * Convenience helper to convert an optional authProvider to a list of http
headers
+ * @param authProvider auth provider
+ * @return list of http headers
+ */
+ public static List<Header> toRequestHeaders(@Nullable AuthProvider
authProvider) {
+ if (authProvider == null) {
+ return Collections.emptyList();
+ }
+ return toRequestHeaders(authProvider.getRequestHeaders());
+ }
+
+ /**
+ * Convenience helper to convert an optional authProvider to a static job
spec token
+ * @param authProvider auth provider
+ * @return static token
+ */
+ public static String toStaticToken(@Nullable AuthProvider authProvider) {
+ if (authProvider == null) {
+ return null;
+ }
+ return authProvider.getTaskToken();
+ }
+
+ /**
+ * Helper to extract string values from complex AuthConfig instance.
+ *
+ * @param config auth config
+ * @param key config key
+ * @param defaultValue default value
+ * @return config value
+ */
+ static String getOrDefault(AuthConfig config, String key, String
defaultValue) {
+ if (config == null || !config.getProperties().containsKey(key)) {
+ return defaultValue;
+ }
+ if (config.getProperties().get(key) instanceof String) {
+ return (String) config.getProperties().get(key);
+ }
+ throw new IllegalArgumentException("Expected String but got " +
config.getProperties().get(key).getClass());
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
b/pinot-common/src/main/java/org/apache/pinot/common/auth/NullAuthProvider.java
similarity index 62%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
copy to
pinot-common/src/main/java/org/apache/pinot/common/auth/NullAuthProvider.java
index 5a9798c355..88b1cdc565 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/auth/NullAuthProvider.java
@@ -16,19 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.auth;
+package org.apache.pinot.common.auth;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.pinot.spi.auth.AuthProvider;
+
/**
- * Container for all auth related info
+ * Noop auth provider
*/
-public class AuthContext {
- private final String _authToken;
+public class NullAuthProvider implements AuthProvider {
+ public NullAuthProvider() {
+ // left blank
+ }
+
+ public NullAuthProvider(AuthConfig ignore) {
+ // left blank
+ }
- public AuthContext(String authToken) {
- _authToken = authToken;
+ @Override
+ public Map<String, Object> getRequestHeaders() {
+ return Collections.emptyMap();
}
- public String getAuthToken() {
- return _authToken;
+ @Override
+ public String getTaskToken() {
+ return null;
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/auth/StaticTokenAuthProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/auth/StaticTokenAuthProvider.java
new file mode 100644
index 0000000000..2f4b0f1845
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/auth/StaticTokenAuthProvider.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.auth;
+
+import java.util.Collections;
+import java.util.Map;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.pinot.spi.auth.AuthProvider;
+
+
+/**
+ * Auth provider for static client tokens, typically used for job specs or
when mimicking legacy behavior.
+ */
+public class StaticTokenAuthProvider implements AuthProvider {
+ public static final String HEADER = "header";
+ public static final String PREFIX = "prefix";
+ public static final String TOKEN = "token";
+
+ protected final String _taskToken;
+ protected final Map<String, Object> _requestHeaders;
+
+ public StaticTokenAuthProvider(String token) {
+ _taskToken = token;
+ _requestHeaders = Collections.singletonMap(HttpHeaders.AUTHORIZATION,
token);
+ }
+
+ public StaticTokenAuthProvider(AuthConfig authConfig) {
+ String header = AuthProviderUtils.getOrDefault(authConfig, HEADER,
HttpHeaders.AUTHORIZATION);
+ String prefix = AuthProviderUtils.getOrDefault(authConfig, PREFIX,
"Basic");
+ String userToken = authConfig.getProperties().get(TOKEN).toString();
+
+ _taskToken = makeToken(prefix, userToken);
+ _requestHeaders = Collections.singletonMap(header, _taskToken);
+ }
+
+ @Override
+ public Map<String, Object> getRequestHeaders() {
+ return _requestHeaders;
+ }
+
+ @Override
+ public String getTaskToken() {
+ return _taskToken;
+ }
+
+ private static String makeToken(String prefix, String token) {
+ if (token.startsWith(prefix)) {
+ return token;
+ }
+ return prefix + " " + token;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/auth/UrlAuthProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/auth/UrlAuthProvider.java
new file mode 100644
index 0000000000..721a743552
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/auth/UrlAuthProvider.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.auth;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
+
+
+/**
+ * Auth provider with dynamic loading support, typically used for rotating
tokens such as those injected by kubernetes.
+ * UrlAuthProvider will re-read the source on every invocation, so beware of
long round-trip times if the source is
+ * remote.
+ */
+public class UrlAuthProvider implements AuthProvider {
+ public static final String HEADER = "header";
+ public static final String PREFIX = "prefix";
+ public static final String URL = "url";
+
+ protected final String _header;
+ protected final String _prefix;
+ protected final URL _url;
+
+ public UrlAuthProvider(String url) {
+ try {
+ _header = HttpHeaders.AUTHORIZATION;
+ _prefix = "Bearer";
+ _url = new URL(url);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public UrlAuthProvider(AuthConfig authConfig) {
+ try {
+ _header = AuthProviderUtils.getOrDefault(authConfig, HEADER,
HttpHeaders.AUTHORIZATION);
+ _prefix = AuthProviderUtils.getOrDefault(authConfig, PREFIX, "Bearer");
+ _url = new URL(authConfig.getProperties().get(URL).toString());
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Map<String, Object> getRequestHeaders() {
+ return Collections.singletonMap(_header, makeToken());
+ }
+
+ @Override
+ public String getTaskToken() {
+ return makeToken();
+ }
+
+ private String makeToken() {
+ try {
+ String token = IOUtils.toString(_url, StandardCharsets.UTF_8);
+ if (token.startsWith(_prefix)) {
+ return token;
+ }
+ return _prefix + " " + token;
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Could not resolve auth url " + _url,
e);
+ }
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 1d10de2bca..ac4413ff35 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -48,9 +48,11 @@ import org.apache.http.entity.mime.content.ContentBody;
import org.apache.http.entity.mime.content.FileBody;
import org.apache.http.entity.mime.content.InputStreamBody;
import org.apache.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -402,23 +404,20 @@ public class FileUploadDownloadClient implements
AutoCloseable {
}
private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String
jsonRequestBody, int socketTimeoutMs,
- @Nullable String authToken) {
+ @Nullable AuthProvider authProvider) {
RequestBuilder requestBuilder =
RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
.setHeader(HttpHeaders.CONTENT_TYPE, HttpClient.JSON_CONTENT_TYPE)
.setEntity(new StringEntity(jsonRequestBody,
ContentType.APPLICATION_JSON));
- if (StringUtils.isNotBlank(authToken)) {
- requestBuilder.addHeader("Authorization", authToken);
- }
+
AuthProviderUtils.toRequestHeaders(authProvider).forEach(requestBuilder::addHeader);
HttpClient.setTimeout(requestBuilder, socketTimeoutMs);
return requestBuilder.build();
}
- private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int
socketTimeoutMs, @Nullable String authToken) {
+ private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int
socketTimeoutMs,
+ @Nullable AuthProvider authProvider) {
RequestBuilder requestBuilder =
RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
.setHeader(HttpHeaders.CONTENT_TYPE, HttpClient.JSON_CONTENT_TYPE);
- if (StringUtils.isNotBlank(authToken)) {
- requestBuilder.addHeader("Authorization", authToken);
- }
+
AuthProviderUtils.toRequestHeaders(authProvider).forEach(requestBuilder::addHeader);
HttpClient.setTimeout(requestBuilder, socketTimeoutMs);
return requestBuilder.build();
}
@@ -856,17 +855,17 @@ public class FileUploadDownloadClient implements
AutoCloseable {
*
* @param uri URI
* @param startReplaceSegmentsRequest request
- * @param authToken auth token
+ * @param authProvider auth provider
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
public SimpleHttpResponse startReplaceSegments(URI uri,
StartReplaceSegmentsRequest startReplaceSegmentsRequest,
- @Nullable String authToken)
+ @Nullable AuthProvider authProvider)
throws IOException, HttpErrorStatusException {
return HttpClient.wrapAndThrowHttpException(_httpClient.sendRequest(
getStartReplaceSegmentsRequest(uri,
JsonUtils.objectToString(startReplaceSegmentsRequest),
- HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, authToken)));
+ HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, authProvider)));
}
/**
@@ -874,15 +873,15 @@ public class FileUploadDownloadClient implements
AutoCloseable {
*
* @param uri URI
* @oaram socketTimeoutMs Socket timeout in milliseconds
- * @param authToken auth token
+ * @param authProvider auth provider
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
- public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs,
@Nullable String authToken)
+ public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs,
@Nullable AuthProvider authProvider)
throws IOException, HttpErrorStatusException {
return HttpClient.wrapAndThrowHttpException(
- _httpClient.sendRequest(getEndReplaceSegmentsRequest(uri,
socketTimeoutMs, authToken)));
+ _httpClient.sendRequest(getEndReplaceSegmentsRequest(uri,
socketTimeoutMs, authProvider)));
}
/**
@@ -938,9 +937,9 @@ public class FileUploadDownloadClient implements
AutoCloseable {
/**
* Deprecated due to lack of auth header support. May break for deployments
with auth enabled
*
- * Download a file using default settings, with an optional auth token
+ * Download a file using default settings
*
- * @see HttpClient#downloadFile(URI, int, File, String, List)
+ * @see HttpClient#downloadFile(URI, int, File, AuthProvider, List)
*
* @param uri URI
* @param socketTimeoutMs Socket timeout in milliseconds
@@ -960,7 +959,7 @@ public class FileUploadDownloadClient implements
AutoCloseable {
*
* Download a file.
*
- * @see FileUploadDownloadClient#downloadFile(URI, File, String)
+ * @see FileUploadDownloadClient#downloadFile(URI, File, AuthProvider)
*
* @param uri URI
* @param dest File destination
@@ -979,14 +978,14 @@ public class FileUploadDownloadClient implements
AutoCloseable {
*
* @param uri URI
* @param dest File destination
- * @param authToken auth token
+ * @param authProvider auth provider
* @return Response status code
* @throws IOException
* @throws HttpErrorStatusException
*/
- public int downloadFile(URI uri, File dest, String authToken)
+ public int downloadFile(URI uri, File dest, AuthProvider authProvider)
throws IOException, HttpErrorStatusException {
- return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS,
dest, null, null);
+ return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS,
dest, authProvider, null);
}
/**
@@ -994,15 +993,15 @@ public class FileUploadDownloadClient implements
AutoCloseable {
*
* @param uri URI
* @param dest File destination
- * @param authToken auth token
+ * @param authProvider auth provider
* @param httpHeaders http headers
* @return Response status code
* @throws IOException
* @throws HttpErrorStatusException
*/
- public int downloadFile(URI uri, File dest, String authToken, List<Header>
httpHeaders)
+ public int downloadFile(URI uri, File dest, AuthProvider authProvider,
List<Header> httpHeaders)
throws IOException, HttpErrorStatusException {
- return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS,
dest, authToken, httpHeaders);
+ return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS,
dest, authProvider, httpHeaders);
}
/**
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index 497822ffb7..d99c256233 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -22,6 +22,8 @@ import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Random;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -36,7 +38,6 @@ public abstract class BaseSegmentFetcher implements
SegmentFetcher {
public static final String RETRY_COUNT_CONFIG_KEY = "retry.count";
public static final String RETRY_WAIT_MS_CONFIG_KEY = "retry.wait.ms";
public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY =
"retry.delay.scale.factor";
- public static final String AUTH_TOKEN = CommonConstants.KEY_OF_AUTH_TOKEN;
public static final int DEFAULT_RETRY_COUNT = 3;
public static final int DEFAULT_RETRY_WAIT_MS = 100;
public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;
@@ -46,14 +47,14 @@ public abstract class BaseSegmentFetcher implements
SegmentFetcher {
protected int _retryCount;
protected int _retryWaitMs;
protected int _retryDelayScaleFactor;
- protected String _authToken;
+ protected AuthProvider _authProvider;
@Override
public void init(PinotConfiguration config) {
_retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY,
DEFAULT_RETRY_COUNT);
_retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY,
DEFAULT_RETRY_WAIT_MS);
_retryDelayScaleFactor =
config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY,
DEFAULT_RETRY_DELAY_SCALE_FACTOR);
- _authToken = config.getProperty(AUTH_TOKEN);
+ _authProvider = AuthProviderUtils.extractAuthProvider(config,
CommonConstants.KEY_OF_AUTH);
doInit(config);
_logger
.info("Initialized with retryCount: {}, retryWaitMs: {},
retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index 1c8c9286d5..a2f31ad3bb 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -63,7 +63,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
if (!InetAddresses.isInetAddress(hostName)) {
httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" +
port));
}
- int statusCode = _httpClient.downloadFile(uri, dest, _authToken,
httpHeaders);
+ int statusCode = _httpClient.downloadFile(uri, dest, _authProvider,
httpHeaders);
_logger
.info("Downloaded segment from: {} to: {} of size: {}; Response
status code: {}", uri, dest, dest.length(),
statusCode);
@@ -94,7 +94,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
public void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
throws Exception {
try {
- int statusCode = _httpClient.downloadFile(uri, dest, _authToken);
+ int statusCode = _httpClient.downloadFile(uri, dest, _authProvider);
_logger.info("Downloaded segment from: {} to: {} of size: {}; Response
status code: {}", uri, dest, dest.length(),
statusCode);
} catch (Exception e) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
index 743cd1b7d0..f9835206d7 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
@@ -24,7 +24,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.auth.AuthConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.crypt.PinotCrypter;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -38,8 +39,8 @@ public class SegmentFetcherFactory {
static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
private static final String PROTOCOLS_KEY = "protocols";
- private static final String AUTH_TOKEN_KEY =
CommonConstants.KEY_OF_AUTH_TOKEN;
private static final String ENCODED_SUFFIX = ".enc";
+ private static final String AUTH_KEY = CommonConstants.KEY_OF_AUTH;
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentFetcherFactory.class);
@@ -90,10 +91,14 @@ public class SegmentFetcherFactory {
segmentFetcher = (SegmentFetcher)
Class.forName(segmentFetcherClassName).newInstance();
}
- String authToken = config.getProperty(AUTH_TOKEN_KEY);
+ AuthConfig authConfig = AuthProviderUtils.extractAuthConfig(config,
AUTH_KEY);
+
+ PinotConfiguration subConfig = config.subset(protocol);
+ AuthConfig subAuthConfig =
AuthProviderUtils.extractAuthConfig(subConfig, AUTH_KEY);
+
Map<String, Object> subConfigMap = config.subset(protocol).toMap();
- if (!subConfigMap.containsKey(AUTH_TOKEN_KEY) &&
StringUtils.isNotBlank(authToken)) {
- subConfigMap.put(AUTH_TOKEN_KEY, authToken);
+ if (subAuthConfig.getProperties().isEmpty() &&
!authConfig.getProperties().isEmpty()) {
+ authConfig.getProperties().forEach((key, value) ->
subConfigMap.put(AUTH_KEY + "." + key, value));
}
segmentFetcher.init(new PinotConfiguration(subConfigMap));
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
index 62948d2921..79b050bf68 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
@@ -35,7 +35,6 @@ import javax.net.ssl.SSLContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
@@ -55,12 +54,13 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
-import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.SimpleHttpErrorInfo;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
@@ -110,7 +110,7 @@ public class HttpClient implements AutoCloseable {
/**
* Deprecated due to lack of auth header support. May break for deployments
with auth enabled
*
- * @see #sendGetRequest(URI, Map, String)
+ * @see #sendGetRequest(URI, Map, AuthProvider)
*/
public SimpleHttpResponse sendGetRequest(URI uri)
throws IOException {
@@ -122,17 +122,16 @@ public class HttpClient implements AutoCloseable {
return sendGetRequest(uri, headers, null);
}
- public SimpleHttpResponse sendGetRequest(URI uri, @Nullable Map<String,
String> headers, @Nullable String authToken)
+ public SimpleHttpResponse sendGetRequest(URI uri, @Nullable Map<String,
String> headers,
+ @Nullable AuthProvider authProvider)
throws IOException {
RequestBuilder requestBuilder =
RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
+
AuthProviderUtils.toRequestHeaders(authProvider).forEach(requestBuilder::addHeader);
if (MapUtils.isNotEmpty(headers)) {
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(header.getKey(), header.getValue());
}
}
- if (StringUtils.isNotBlank(authToken)) {
- requestBuilder.addHeader(AUTH_HTTP_HEADER, authToken);
- }
setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
return sendRequest(requestBuilder.build());
}
@@ -140,7 +139,7 @@ public class HttpClient implements AutoCloseable {
/**
* Deprecated due to lack of auth header support. May break for deployments
with auth enabled
*
- * @see #sendDeleteRequest(URI, Map, String)
+ * @see #sendDeleteRequest(URI, Map, AuthProvider)
*/
public SimpleHttpResponse sendDeleteRequest(URI uri)
throws IOException {
@@ -153,12 +152,10 @@ public class HttpClient implements AutoCloseable {
}
public SimpleHttpResponse sendDeleteRequest(URI uri, @Nullable Map<String,
String> headers,
- @Nullable String authToken)
+ @Nullable AuthProvider authProvider)
throws IOException {
RequestBuilder requestBuilder =
RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
- if (StringUtils.isNotBlank(authToken)) {
- requestBuilder.addHeader(AUTH_HTTP_HEADER, authToken);
- }
+
AuthProviderUtils.toRequestHeaders(authProvider).forEach(requestBuilder::addHeader);
if (MapUtils.isNotEmpty(headers)) {
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(header.getKey(), header.getValue());
@@ -171,7 +168,7 @@ public class HttpClient implements AutoCloseable {
/**
* Deprecated due to lack of auth header support. May break for deployments
with auth enabled
*
- * @see #sendPostRequest(URI, HttpEntity, Map, String)
+ * @see #sendPostRequest(URI, HttpEntity, Map, AuthProvider)
*/
public SimpleHttpResponse sendPostRequest(URI uri, @Nullable HttpEntity
payload,
@Nullable Map<String, String> headers)
@@ -180,15 +177,13 @@ public class HttpClient implements AutoCloseable {
}
public SimpleHttpResponse sendPostRequest(URI uri, @Nullable HttpEntity
payload,
- @Nullable Map<String, String> headers, @Nullable String authToken)
+ @Nullable Map<String, String> headers, @Nullable AuthProvider
authProvider)
throws IOException {
RequestBuilder requestBuilder =
RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1);
if (payload != null) {
requestBuilder.setEntity(payload);
}
- if (StringUtils.isNotBlank(authToken)) {
- requestBuilder.addHeader(AUTH_HTTP_HEADER, authToken);
- }
+
AuthProviderUtils.toRequestHeaders(authProvider).forEach(requestBuilder::addHeader);
if (MapUtils.isNotEmpty(headers)) {
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(header.getKey(), header.getValue());
@@ -201,7 +196,7 @@ public class HttpClient implements AutoCloseable {
/**
* Deprecated due to lack of auth header support. May break for deployments
with auth enabled
*
- * @see #sendPutRequest(URI, HttpEntity, Map, String)
+ * @see #sendPutRequest(URI, HttpEntity, Map, AuthProvider)
*/
public SimpleHttpResponse sendPutRequest(URI uri, @Nullable HttpEntity
payload, @Nullable Map<String, String> headers)
throws IOException {
@@ -209,15 +204,13 @@ public class HttpClient implements AutoCloseable {
}
public SimpleHttpResponse sendPutRequest(URI uri, @Nullable HttpEntity
payload, @Nullable Map<String, String> headers,
- @Nullable String authToken)
+ @Nullable AuthProvider authProvider)
throws IOException {
RequestBuilder requestBuilder =
RequestBuilder.put(uri).setVersion(HttpVersion.HTTP_1_1);
if (payload != null) {
requestBuilder.setEntity(payload);
}
- if (StringUtils.isNotBlank(authToken)) {
- requestBuilder.addHeader(AUTH_HTTP_HEADER, authToken);
- }
+
AuthProviderUtils.toRequestHeaders(authProvider).forEach(requestBuilder::addHeader);
if (MapUtils.isNotEmpty(headers)) {
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(header.getKey(), header.getValue());
@@ -243,13 +236,13 @@ public class HttpClient implements AutoCloseable {
}
public SimpleHttpResponse sendJsonPostRequest(URI uri, @Nullable String
jsonRequestBody,
- @Nullable Map<String, String> headers, @Nullable String authToken)
+ @Nullable Map<String, String> headers, @Nullable AuthProvider
authProvider)
throws IOException {
Map<String, String> headersWrapper = MapUtils.isEmpty(headers) ? new
HashMap<>() : new HashMap<>(headers);
headersWrapper.put(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE);
HttpEntity entity =
jsonRequestBody == null ? null : new StringEntity(jsonRequestBody,
ContentType.APPLICATION_JSON);
- return sendPostRequest(uri, entity, headers, authToken);
+ return sendPostRequest(uri, entity, headers, authProvider);
}
public SimpleHttpResponse sendJsonPutRequest(URI uri, @Nullable String
jsonRequestBody)
@@ -264,13 +257,13 @@ public class HttpClient implements AutoCloseable {
}
public SimpleHttpResponse sendJsonPutRequest(URI uri, @Nullable String
jsonRequestBody,
- @Nullable Map<String, String> headers, @Nullable String authToken)
+ @Nullable Map<String, String> headers, @Nullable AuthProvider
authProvider)
throws IOException {
Map<String, String> headersWrapper = MapUtils.isEmpty(headers) ? new
HashMap<>() : new HashMap<>(headers);
headersWrapper.put(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE);
HttpEntity entity =
jsonRequestBody == null ? null : new StringEntity(jsonRequestBody,
ContentType.APPLICATION_JSON);
- return sendPutRequest(uri, entity, headersWrapper, authToken);
+ return sendPutRequest(uri, entity, headersWrapper, authProvider);
}
// --------------------------------------------------------------------------
@@ -366,15 +359,15 @@ public class HttpClient implements AutoCloseable {
* @param uri URI
* @param socketTimeoutMs Socket timeout in milliseconds
* @param dest File destination
- * @param authToken auth token
+ * @param authProvider auth provider
* @param httpHeaders http headers
* @return Response status code
* @throws IOException
* @throws HttpErrorStatusException
*/
- public int downloadFile(URI uri, int socketTimeoutMs, File dest, String
authToken, List<Header> httpHeaders)
+ public int downloadFile(URI uri, int socketTimeoutMs, File dest,
AuthProvider authProvider, List<Header> httpHeaders)
throws IOException, HttpErrorStatusException {
- HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs,
authToken, httpHeaders);
+ HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs,
authProvider, httpHeaders);
try (CloseableHttpResponse response = _httpClient.execute(request)) {
StatusLine statusLine = response.getStatusLine();
int statusCode = statusLine.getStatusCode();
@@ -433,19 +426,6 @@ public class HttpClient implements AutoCloseable {
requestBuilder.setConfig(requestConfig);
}
- /**
- * Generate an (optional) HTTP Authorization header given an auth token.
- *
- * @param authToken auth token
- * @return list of 0 or 1 "Authorization" headers
- */
- public static List<Header> makeAuthHeader(String authToken) {
- if (StringUtils.isBlank(authToken)) {
- return Collections.emptyList();
- }
- return Collections.singletonList(new BasicHeader(AUTH_HTTP_HEADER,
authToken));
- }
-
private static String getErrorMessage(HttpUriRequest request,
CloseableHttpResponse response) {
String controllerHost = null;
String controllerVersion = null;
@@ -470,12 +450,10 @@ public class HttpClient implements AutoCloseable {
return errorMessage;
}
- private static HttpUriRequest getDownloadFileRequest(URI uri, int
socketTimeoutMs, String authToken,
+ private static HttpUriRequest getDownloadFileRequest(URI uri, int
socketTimeoutMs, AuthProvider authProvider,
List<Header> httpHeaders) {
RequestBuilder requestBuilder =
RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
- if (org.apache.commons.lang.StringUtils.isNotBlank(authToken)) {
- requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, authToken);
- }
+
AuthProviderUtils.toRequestHeaders(authProvider).forEach(requestBuilder::addHeader);
HttpClient.setTimeout(requestBuilder, socketTimeoutMs);
String userInfo = uri.getUserInfo();
if (userInfo != null) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
index 20e128da14..0dedcf7e32 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
@@ -37,6 +37,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
@@ -44,6 +45,7 @@ import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.FileIngestionHelper;
import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
@@ -202,17 +204,15 @@ public class PinotIngestionRestletResource {
});
Schema schema =
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
+ AuthProvider authProvider =
AuthProviderUtils.extractAuthProvider(_controllerConf,
+ CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY
+ ".auth");
+
FileIngestionHelper fileIngestionHelper =
new FileIngestionHelper(tableConfig, schema, batchConfigMap,
getControllerUri(),
- new File(_controllerConf.getDataDir(), UPLOAD_DIR),
getAuthToken());
+ new File(_controllerConf.getDataDir(), UPLOAD_DIR), authProvider);
return fileIngestionHelper.buildSegmentAndPush(payload);
}
- private String getAuthToken() {
- return _controllerConf
-
.getProperty(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY
+ ".auth.token");
- }
-
private URI getControllerUri() {
try {
return new URI(_controllerConf.generateVipUrl());
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 92da871460..961240edbb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -35,7 +35,7 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.controller.api.resources.SuccessResponse;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.auth.AuthContext;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -71,16 +71,16 @@ public class FileIngestionHelper {
private final Map<String, String> _batchConfigMap;
private final URI _controllerUri;
private final File _uploadDir;
- private final AuthContext _authContext;
+ private final AuthProvider _authProvider;
public FileIngestionHelper(TableConfig tableConfig, Schema schema,
Map<String, String> batchConfigMap,
- URI controllerUri, File uploadDir, String authToken) {
+ URI controllerUri, File uploadDir, AuthProvider authProvider) {
_tableConfig = tableConfig;
_schema = schema;
_batchConfigMap = batchConfigMap;
_controllerUri = controllerUri;
_uploadDir = uploadDir;
- _authContext = new AuthContext(authToken);
+ _authProvider = authProvider;
}
/**
@@ -154,7 +154,7 @@ public class FileIngestionHelper {
.setIngestionConfig(ingestionConfigOverride).build();
SegmentUploader segmentUploader =
PluginManager.get().createInstance(SEGMENT_UPLOADER_CLASS);
segmentUploader.init(tableConfigOverride);
- segmentUploader.uploadSegment(segmentTarFile.toURI(), _authContext);
+ segmentUploader.uploadSegment(segmentTarFile.toURI(), _authProvider);
LOGGER.info("Uploaded tar: {} to table: {}",
segmentTarFile.getAbsolutePath(), tableNameWithType);
return new SuccessResponse(
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 5b28f04fd3..d4eb1f9175 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -33,11 +33,14 @@ import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
@@ -61,8 +64,10 @@ import
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.slf4j.Logger;
@@ -87,7 +92,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected File _resourceTmpDir;
protected Logger _logger;
protected HelixManager _helixManager;
- protected String _authToken;
+ protected AuthProvider _authProvider;
// Fixed size LRU cache with TableName - SegmentName pair as key, and
segment related
// errors as the value.
@@ -104,7 +109,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_propertyStore = propertyStore;
_serverMetrics = serverMetrics;
_helixManager = helixManager;
- _authToken = tableDataManagerConfig.getAuthToken();
+
+ _authProvider =
AuthProviderUtils.extractAuthProvider(toPinotConfiguration(_tableDataManagerConfig.getAuthConfig()),
+ null);
_tableNameWithType = tableDataManagerConfig.getTableName();
_tableDataDir = tableDataManagerConfig.getDataDir();
@@ -627,4 +634,11 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
}
}
+
+ private static PinotConfiguration toPinotConfiguration(Configuration
configuration) {
+ if (configuration == null) {
+ return new PinotConfiguration();
+ }
+ return new PinotConfiguration((Map<String, Object>) (Map)
ConfigurationConverter.getMap(configuration));
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 2e4a10ee2c..30e4d1d92f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -64,7 +64,7 @@ public class SegmentCommitterFactory {
segmentUploader = new Server2ControllerSegmentUploader(_logger,
_protocolHandler.getFileUploadDownloadClient(),
_protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl),
params.getSegmentName(),
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(),
_serverMetrics,
- _protocolHandler.getAuthToken());
+ _protocolHandler.getAuthProvider());
return new SplitSegmentCommitter(_logger, _protocolHandler, params,
segmentUploader);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
index 50c06fe011..2559616b49 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
@@ -21,13 +21,14 @@ package org.apache.pinot.core.data.manager.realtime;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.core.util.SegmentCompletionProtocolUtils;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.slf4j.Logger;
@@ -40,11 +41,11 @@ public class Server2ControllerSegmentUploader implements
SegmentUploader {
private final String _segmentName;
private final int _segmentUploadRequestTimeoutMs;
private final ServerMetrics _serverMetrics;
- private final String _authToken;
+ private final AuthProvider _authProvider;
public Server2ControllerSegmentUploader(Logger segmentLogger,
FileUploadDownloadClient fileUploadDownloadClient,
String controllerSegmentUploadCommitUrl, String segmentName, int
segmentUploadRequestTimeoutMs,
- ServerMetrics serverMetrics, String authToken)
+ ServerMetrics serverMetrics, AuthProvider authProvider)
throws URISyntaxException {
_segmentLogger = segmentLogger;
_fileUploadDownloadClient = fileUploadDownloadClient;
@@ -52,7 +53,7 @@ public class Server2ControllerSegmentUploader implements
SegmentUploader {
_segmentName = segmentName;
_segmentUploadRequestTimeoutMs = segmentUploadRequestTimeoutMs;
_serverMetrics = serverMetrics;
- _authToken = authToken;
+ _authProvider = authProvider;
}
@Override
@@ -73,7 +74,7 @@ public class Server2ControllerSegmentUploader implements
SegmentUploader {
try {
String responseStr = _fileUploadDownloadClient
.uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName,
segmentFile,
- HttpClient.makeAuthHeader(_authToken), null,
_segmentUploadRequestTimeoutMs).getResponse();
+ AuthProviderUtils.toRequestHeaders(_authProvider), null,
_segmentUploadRequestTimeoutMs).getResponse();
response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
_segmentLogger.info("Controller response {} for {}",
response.toJsonString(), _controllerSegmentUploadCommitUrl);
if
(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER))
{
diff --git
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index f643d6fc13..a7547e9ab1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -24,13 +24,14 @@ import java.net.URISyntaxException;
import java.util.Map;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.ClientSSLContextGenerator;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.http.HttpClient;
import
org.apache.pinot.core.data.manager.realtime.Server2ControllerSegmentUploader;
import org.apache.pinot.core.util.SegmentCompletionProtocolUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -53,7 +54,7 @@ public class ServerSegmentCompletionProtocolHandler {
private static SSLContext _sslContext;
private static Integer _controllerHttpsPort;
private static int _segmentUploadRequestTimeoutMs;
- private static String _authToken;
+ private static AuthProvider _authProvider;
private static String _protocol = HTTP_PROTOCOL;
private final FileUploadDownloadClient _fileUploadDownloadClient;
@@ -73,7 +74,8 @@ public class ServerSegmentCompletionProtocolHandler {
_protocol = uploaderConfig.getProperty(CONFIG_OF_PROTOCOL, HTTP_PROTOCOL);
_segmentUploadRequestTimeoutMs = uploaderConfig
.getProperty(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS,
DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
- _authToken =
uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOADER_AUTH_TOKEN);
+
+ _authProvider = AuthProviderUtils.extractAuthProvider(uploaderConfig,
CONFIG_OF_SEGMENT_UPLOADER_AUTH);
}
public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics,
String tableNameWithType) {
@@ -90,8 +92,8 @@ public class ServerSegmentCompletionProtocolHandler {
return _fileUploadDownloadClient;
}
- public String getAuthToken() {
- return _authToken;
+ public AuthProvider getAuthProvider() {
+ return _authProvider;
}
public SegmentCompletionProtocol.Response
segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
@@ -157,7 +159,7 @@ public class ServerSegmentCompletionProtocolHandler {
try {
segmentUploader =
new Server2ControllerSegmentUploader(LOGGER,
_fileUploadDownloadClient, url, params.getSegmentName(),
- _segmentUploadRequestTimeoutMs, _serverMetrics, _authToken);
+ _segmentUploadRequestTimeoutMs, _serverMetrics, _authProvider);
} catch (URISyntaxException e) {
LOGGER.error("Segment commit upload url error: ", e);
return SegmentCompletionProtocol.RESP_NOT_SENT;
@@ -212,7 +214,7 @@ public class ServerSegmentCompletionProtocolHandler {
SegmentCompletionProtocol.Response response;
try {
String responseStr = _fileUploadDownloadClient
- .sendSegmentCompletionProtocolRequest(new URI(url),
HttpClient.makeAuthHeader(_authToken), null,
+ .sendSegmentCompletionProtocolRequest(new URI(url),
AuthProviderUtils.toRequestHeaders(_authProvider), null,
DEFAULT_OTHER_REQUESTS_TIMEOUT).getResponse();
response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
LOGGER.info("Controller response {} for {}", response.toJsonString(),
url);
@@ -237,7 +239,7 @@ public class ServerSegmentCompletionProtocolHandler {
SegmentCompletionProtocol.Response response;
try {
String responseStr = _fileUploadDownloadClient
- .uploadSegmentMetadataFiles(new URI(url), metadataFiles,
HttpClient.makeAuthHeader(_authToken),
+ .uploadSegmentMetadataFiles(new URI(url), metadataFiles,
AuthProviderUtils.toRequestHeaders(_authProvider),
null, _segmentUploadRequestTimeoutMs).getResponse();
response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
LOGGER.info("Controller response {} for {}", response.toJsonString(),
url);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 7adc732b61..380011cc90 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -29,6 +30,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -117,6 +119,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
config = mock(TableDataManagerConfig.class);
when(config.getTableName()).thenReturn(TABLE_NAME);
when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath());
+ when(config.getAuthConfig()).thenReturn(new MapConfiguration(new
HashMap<>()));
}
tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, 0);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 97362fc48c..314d12a3bc 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -24,10 +24,12 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -526,6 +528,7 @@ public class BaseTableDataManagerTest {
TableDataManagerConfig config = mock(TableDataManagerConfig.class);
when(config.getTableName()).thenReturn(TABLE_NAME);
when(config.getDataDir()).thenReturn(TABLE_DATA_DIR.getAbsolutePath());
+ when(config.getAuthConfig()).thenReturn(new
MapConfiguration(Collections.emptyMap()));
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
index fafa7cfd42..39ad829fda 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -158,7 +159,8 @@ public class BasicAuthBatchIntegrationTest extends
ClusterTest {
IOUtils
.write(jobFileContents.replaceAll("9000",
String.valueOf(getControllerPort())), new FileOutputStream(jobFile));
- new BootstrapTableTool("http", "localhost", getControllerPort(),
baseDir.getAbsolutePath(), AUTH_TOKEN).execute();
+ new BootstrapTableTool("http", "localhost", getControllerPort(),
baseDir.getAbsolutePath(),
+ AuthProviderUtils.makeAuthProvider(AUTH_TOKEN)).execute();
Thread.sleep(5000);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
index 22ed5fc4ac..6fe9f7b8db 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
@@ -465,6 +465,13 @@ public class TlsIntegrationTest extends
BaseClusterIntegrationTest {
}
}
+ @Test(expectedExceptions = IOException.class)
+ public void testUnauthenticatedFailure()
+ throws IOException {
+ sendDeleteRequest(
+
_controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType("mytable")));
+ }
+
@Test
public void testRealtimeSegmentUploadDownload()
throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
similarity index 70%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
index baa9972644..022d7f5c4b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import groovy.lang.IntRange;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -31,6 +32,7 @@ import org.apache.pinot.client.Connection;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
import org.apache.pinot.client.ResultSetGroup;
+import org.apache.pinot.common.auth.UrlAuthProvider;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -48,11 +50,12 @@ import org.testng.annotations.Test;
import static
org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_HEADER;
-/**
- * NOTE: fully covered by TlsIntegrationTest. If that one fails for realtime
segments try this one to isolate any TLS
- * related issues.
- */
-public class BasicAuthRealtimeIntegrationTest extends
BaseClusterIntegrationTest {
+public class UrlAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest
{
+ final static String AUTH_PROVIDER_CLASS =
UrlAuthProvider.class.getCanonicalName();
+ final static URL AUTH_URL =
UrlAuthRealtimeIntegrationTest.class.getResource("/url-auth-token.txt");
+ final static URL AUTH_URL_PREFIXED =
UrlAuthRealtimeIntegrationTest.class.getResource("/url-auth-token-prefixed.txt");
+ final static String AUTH_PREFIX = "Basic";
+
@BeforeClass
public void setUp()
throws Exception {
@@ -95,22 +98,49 @@ public class BasicAuthRealtimeIntegrationTest extends
BaseClusterIntegrationTest
@Override
public Map<String, Object> getDefaultControllerConfiguration() {
- return
BasicAuthTestUtils.addControllerConfiguration(super.getDefaultControllerConfiguration());
+ Map<String, Object> conf =
BasicAuthTestUtils.addControllerConfiguration(super.getDefaultControllerConfiguration());
+ conf.put("controller.segment.fetcher.auth.provider.class",
AUTH_PROVIDER_CLASS);
+ conf.put("controller.segment.fetcher.auth.url", AUTH_URL);
+ conf.put("controller.segment.fetcher.auth.prefix", AUTH_PREFIX);
+
+ return conf;
}
@Override
protected PinotConfiguration getDefaultBrokerConfiguration() {
- return
BasicAuthTestUtils.addBrokerConfiguration(super.getDefaultBrokerConfiguration().toMap());
+ PinotConfiguration conf =
BasicAuthTestUtils.addBrokerConfiguration(super.getDefaultBrokerConfiguration().toMap());
+ // no customization yet
+
+ return conf;
}
@Override
protected PinotConfiguration getDefaultServerConfiguration() {
- return
BasicAuthTestUtils.addServerConfiguration(super.getDefaultServerConfiguration().toMap());
+ PinotConfiguration conf =
BasicAuthTestUtils.addServerConfiguration(super.getDefaultServerConfiguration().toMap());
+ conf.setProperty("pinot.server.segment.fetcher.auth.provider.class",
AUTH_PROVIDER_CLASS);
+ conf.setProperty("pinot.server.segment.fetcher.auth.url", AUTH_URL);
+ conf.setProperty("pinot.server.segment.fetcher.auth.prefix", AUTH_PREFIX);
+ conf.setProperty("pinot.server.segment.uploader.auth.provider.class",
AUTH_PROVIDER_CLASS);
+ conf.setProperty("pinot.server.segment.uploader.auth.url", AUTH_URL);
+ conf.setProperty("pinot.server.segment.uploader.auth.prefix", AUTH_PREFIX);
+ conf.setProperty("pinot.server.instance.auth.provider.class",
AUTH_PROVIDER_CLASS);
+ conf.setProperty("pinot.server.instance.auth.url", AUTH_URL);
+ conf.setProperty("pinot.server.instance.auth.prefix", AUTH_PREFIX);
+
+ return conf;
}
@Override
protected PinotConfiguration getDefaultMinionConfiguration() {
- return
BasicAuthTestUtils.addMinionConfiguration(super.getDefaultMinionConfiguration().toMap());
+ PinotConfiguration conf =
BasicAuthTestUtils.addMinionConfiguration(super.getDefaultMinionConfiguration().toMap());
+ conf.setProperty("segment.fetcher.auth.provider.class",
AUTH_PROVIDER_CLASS);
+ conf.setProperty("segment.fetcher.auth.url", AUTH_URL_PREFIXED);
+ conf.setProperty("segment.fetcher.auth.prefix", AUTH_PREFIX);
+ conf.setProperty("task.auth.provider.class", AUTH_PROVIDER_CLASS);
+ conf.setProperty("task.auth.url", AUTH_URL_PREFIXED);
+ conf.setProperty("task.auth.prefix", AUTH_PREFIX);
+
+ return conf;
}
@Override
@@ -162,6 +192,13 @@ public class BasicAuthRealtimeIntegrationTest extends
BaseClusterIntegrationTest
AUTH_HEADER);
}
+ @Test(expectedExceptions = IOException.class)
+ public void testUnauthenticatedFailure()
+ throws IOException {
+ sendDeleteRequest(
+
_controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType("mytable")));
+ }
+
@Test
public void testSegmentUploadDownload()
throws Exception {
diff --git
a/pinot-integration-tests/src/test/resources/url-auth-token-prefixed.txt
b/pinot-integration-tests/src/test/resources/url-auth-token-prefixed.txt
new file mode 100644
index 0000000000..8b0a41c071
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/url-auth-token-prefixed.txt
@@ -0,0 +1 @@
+Basic YWRtaW46dmVyeXNlY3JldA==
\ No newline at end of file
diff --git a/pinot-integration-tests/src/test/resources/url-auth-token.txt
b/pinot-integration-tests/src/test/resources/url-auth-token.txt
new file mode 100644
index 0000000000..f230446cf8
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/url-auth-token.txt
@@ -0,0 +1 @@
+YWRtaW46dmVyeXNlY3JldA==
\ No newline at end of file
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 3ea7c7d28f..d33921b080 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -33,16 +33,17 @@ import org.apache.http.Header;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
-import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.exception.TaskCancelledException;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,7 +106,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
.collect(Collectors.toList());
String lineageEntryId =
SegmentConversionUtils.startSegmentReplace(context.getTableNameWithType(),
context.getUploadURL(),
- new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo),
context.getAuthToken());
+ new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo),
context.getAuthProvider());
context.setCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID,
lineageEntryId);
}
}
@@ -116,7 +117,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
if (context.isReplaceSegmentsEnabled()) {
String lineageEntryId = (String)
context.getCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID);
SegmentConversionUtils.endSegmentReplace(context.getTableNameWithType(),
context.getUploadURL(), lineageEntryId,
- _minionConf.getEndReplaceSegmentsTimeoutMs(),
context.getAuthToken());
+ _minionConf.getEndReplaceSegmentsTimeoutMs(),
context.getAuthProvider());
}
}
@@ -132,7 +133,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
String[] downloadURLs =
downloadURLString.split(MinionConstants.URL_SEPARATOR);
String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
- String authToken = configs.get(MinionConstants.AUTH_TOKEN);
+ AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
LOGGER.info("Start executing {} on table: {}, input segments: {} with
downloadURLs: {}, uploadURL: {}", taskType,
tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
@@ -215,7 +216,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
List<Header> httpHeaders = new ArrayList<>();
httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
- httpHeaders.addAll(HttpClient.makeAuthHeader(authToken));
+ httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
// Set parameters for upload request
NameValuePair enableParallelPushProtectionParameter =
@@ -255,7 +256,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
private final String _tableNameWithType;
private final String _uploadURL;
- private final String _authToken;
+ private final AuthProvider _authProvider;
private final String _inputSegmentNames;
private final boolean _replaceSegmentsEnabled;
private final Map<String, Object> _customMap;
@@ -268,7 +269,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
Map<String, String> configs = pinotTaskConfig.getConfigs();
_tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
_uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
- _authToken = configs.get(MinionConstants.AUTH_TOKEN);
+ _authProvider =
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
_inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
String replaceSegmentsString =
configs.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY);
_replaceSegmentsEnabled = Boolean.parseBoolean(replaceSegmentsString);
@@ -291,8 +292,8 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
return _uploadURL;
}
- public String getAuthToken() {
- return _authToken;
+ public AuthProvider getAuthProvider() {
+ return _authProvider;
}
public String getInputSegmentNames() {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index 730f5f1d05..773efd83ef 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -31,14 +31,15 @@ import org.apache.http.HttpHeaders;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
-import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.exception.TaskCancelledException;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +70,7 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
String downloadURL = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
String originalSegmentCrc =
configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
- String authToken = configs.get(MinionConstants.AUTH_TOKEN);
+ AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
long currentSegmentCrc = getSegmentCrc(tableNameWithType, segmentName);
if (Long.parseLong(originalSegmentCrc) != currentSegmentCrc) {
@@ -150,7 +151,7 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
httpHeaders.add(ifMatchHeader);
httpHeaders.add(refreshOnlyHeader);
httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
- httpHeaders.addAll(HttpClient.makeAuthHeader(authToken));
+ httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
// Set parameters for upload request.
NameValuePair enableParallelPushProtectionParameter =
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 87ede0f3e4..d2b4dc37b7 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -124,13 +125,14 @@ public class SegmentConversionUtils {
}
public static String startSegmentReplace(String tableNameWithType, String
uploadURL,
- StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable
String authToken)
+ StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable
AuthProvider authProvider)
throws Exception {
- return startSegmentReplace(tableNameWithType, uploadURL,
startReplaceSegmentsRequest, authToken, true);
+ return startSegmentReplace(tableNameWithType, uploadURL,
startReplaceSegmentsRequest, authProvider, true);
}
public static String startSegmentReplace(String tableNameWithType, String
uploadURL,
- StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable
String authToken, boolean forceCleanup)
+ StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable
AuthProvider authProvider,
+ boolean forceCleanup)
throws Exception {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -139,17 +141,18 @@ public class SegmentConversionUtils {
URI uri = FileUploadDownloadClient
.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName,
tableType.name(), forceCleanup);
SimpleHttpResponse response =
- fileUploadDownloadClient.startReplaceSegments(uri,
startReplaceSegmentsRequest, authToken);
+ fileUploadDownloadClient.startReplaceSegments(uri,
startReplaceSegmentsRequest, authProvider);
String responseString = response.getResponse();
LOGGER.info(
- "Got response {}: {} while sending start replace segment request for
table: {}, uploadURL: {}, request: {}",
+ "Got response {}: {} while sending start replace segment
reBaseSingleSegmentConversionExecutorquest for "
+ + "table: {}, uploadURL: {}, request: {}",
response.getStatusCode(), responseString, tableNameWithType,
uploadURL, startReplaceSegmentsRequest);
return
JsonUtils.stringToJsonNode(responseString).get("segmentLineageEntryId").asText();
}
}
public static void endSegmentReplace(String tableNameWithType, String
uploadURL, String segmentLineageEntryId,
- int socketTimeoutMs, @Nullable String authToken)
+ int socketTimeoutMs, @Nullable AuthProvider authProvider)
throws Exception {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -157,7 +160,7 @@ public class SegmentConversionUtils {
try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient(sslContext)) {
URI uri = FileUploadDownloadClient
.getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName,
tableType.name(), segmentLineageEntryId);
- SimpleHttpResponse response =
fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authToken);
+ SimpleHttpResponse response =
fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authProvider);
LOGGER.info("Got response {}: {} while sending end replace segment
request for table: {}, uploadURL: {}",
response.getStatusCode(), response.getResponse(), tableNameWithType,
uploadURL);
}
diff --git
a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
index c836e49924..81f538c7f7 100644
---
a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
+++
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.segment.local.utils.IngestionUtils;
-import org.apache.pinot.spi.auth.AuthContext;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfig;
@@ -86,15 +86,15 @@ public class SegmentUploaderDefault implements
SegmentUploader {
}
@Override
- public void uploadSegment(URI segmentTarURI, @Nullable AuthContext
authContext)
+ public void uploadSegment(URI segmentTarURI, @Nullable AuthProvider
authProvider)
throws Exception {
IngestionUtils
- .uploadSegment(_tableNameWithType, _batchConfig,
Collections.singletonList(segmentTarURI), authContext);
+ .uploadSegment(_tableNameWithType, _batchConfig,
Collections.singletonList(segmentTarURI), authProvider);
LOGGER.info("Successfully uploaded segment: {} to table: {}",
segmentTarURI, _tableNameWithType);
}
@Override
- public void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext
authContext)
+ public void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthProvider
authProvider)
throws Exception {
List<URI> segmentTarURIs = new ArrayList<>();
@@ -106,7 +106,7 @@ public class SegmentUploaderDefault implements
SegmentUploader {
segmentTarURIs.add(uri);
}
}
- IngestionUtils.uploadSegment(_tableNameWithType, _batchConfig,
segmentTarURIs, authContext);
+ IngestionUtils.uploadSegment(_tableNameWithType, _batchConfig,
segmentTarURIs, authProvider);
LOGGER.info("Successfully uploaded segments: {} to table: {}",
segmentTarURIs, _tableNameWithType);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
index f393338323..b0baa40e3b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
@@ -36,7 +36,7 @@ public class TableDataManagerConfig {
private static final String TABLE_DATA_MANAGER_CONSUMER_DIRECTORY =
"consumerDirectory";
private static final String TABLE_DATA_MANAGER_NAME = "name";
private static final String TABLE_IS_DIMENSION = "isDimTable";
- private static final String TABLE_DATA_MANGER_AUTH_TOKEN = "authToken";
+ private static final String TABLE_DATA_MANAGER_AUTH = "auth";
private final Configuration _tableDataManagerConfig;
@@ -68,8 +68,8 @@ public class TableDataManagerConfig {
return _tableDataManagerConfig.getBoolean(TABLE_IS_DIMENSION);
}
- public String getAuthToken() {
- return _tableDataManagerConfig.getString(TABLE_DATA_MANGER_AUTH_TOKEN);
+ public Configuration getAuthConfig() {
+ return _tableDataManagerConfig.subset(TABLE_DATA_MANAGER_AUTH);
}
public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig(
@@ -82,16 +82,18 @@ public class TableDataManagerConfig {
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
Preconditions.checkNotNull(tableType);
defaultConfig.addProperty(TABLE_DATA_MANAGER_TYPE, tableType.name());
- defaultConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN,
instanceDataManagerConfig.getAuthToken());
+
+ // copy auth-related configs
+
instanceDataManagerConfig.getConfig().subset(TABLE_DATA_MANAGER_AUTH).toMap()
+ .forEach((key, value) ->
defaultConfig.setProperty(TABLE_DATA_MANAGER_AUTH + "." + key, value));
return new TableDataManagerConfig(defaultConfig);
}
- public void overrideConfigs(TableConfig tableConfig, String authToken) {
+ public void overrideConfigs(TableConfig tableConfig) {
// Override table level configs
_tableDataManagerConfig.addProperty(TABLE_IS_DIMENSION,
tableConfig.isDimTable());
- _tableDataManagerConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN,
authToken);
// If we wish to override some table level configs using table config,
override them here
// Note: the configs in TableDataManagerConfig is immutable once the table
is created, which mean it will not pick
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index 0e38807cc5..eea20f0687 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
@@ -38,7 +39,7 @@ import
org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
import
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
-import org.apache.pinot.spi.auth.AuthContext;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
@@ -188,13 +189,14 @@ public final class IngestionUtils {
* @param tableNameWithType name of the table to upload the segment
* @param batchConfig batchConfig with details about push such as
controllerURI, pushAttempts, pushParallelism, etc
* @param segmentTarURIs list of URI for the segment tar files
- * @param authContext auth details required to upload the Pinot segment to
controller
+ * @param authProvider auth provider
*/
public static void uploadSegment(String tableNameWithType, BatchConfig
batchConfig, List<URI> segmentTarURIs,
- @Nullable AuthContext authContext)
+ @Nullable AuthProvider authProvider)
throws Exception {
- SegmentGenerationJobSpec segmentUploadSpec =
generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext);
+ SegmentGenerationJobSpec segmentUploadSpec =
generateSegmentUploadSpec(tableNameWithType, batchConfig,
+ authProvider);
List<String> segmentTarURIStrs =
segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList());
String pushMode = batchConfig.getPushMode();
@@ -249,7 +251,7 @@ public final class IngestionUtils {
}
private static SegmentGenerationJobSpec generateSegmentUploadSpec(String
tableName, BatchConfig batchConfig,
- @Nullable AuthContext authContext) {
+ @Nullable AuthProvider authProvider) {
TableSpec tableSpec = new TableSpec();
tableSpec.setTableName(tableName);
@@ -269,9 +271,8 @@ public final class IngestionUtils {
spec.setPushJobSpec(pushJobSpec);
spec.setTableSpec(tableSpec);
spec.setPinotClusterSpecs(pinotClusterSpecs);
- if (authContext != null &&
StringUtils.isNotBlank(authContext.getAuthToken())) {
- spec.setAuthToken(authContext.getAuthToken());
- }
+ spec.setAuthToken(AuthProviderUtils.toStaticToken(authProvider));
+
return spec;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index d81143a0f3..1d91e5bb4e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -37,12 +37,14 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -105,6 +107,7 @@ public class SegmentPushUtils implements Serializable {
String fileName = tarFile.getName();
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
String segmentName = fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length());
+ AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
URI controllerURI;
try {
@@ -125,7 +128,7 @@ public class SegmentPushUtils implements Serializable {
try (InputStream inputStream = fileSystem.open(tarFileURI)) {
SimpleHttpResponse response =
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
- segmentName, inputStream,
HttpClient.makeAuthHeader(spec.getAuthToken()),
+ segmentName, inputStream,
AuthProviderUtils.toRequestHeaders(authProvider),
FileUploadDownloadClient.makeTableParam(tableName),
tableName, tableType);
LOGGER.info("Response for pushing table {} segment {} to location
{} - {}: {}", tableName, segmentName,
controllerURI, response.getStatusCode(),
response.getResponse());
@@ -162,6 +165,7 @@ public class SegmentPushUtils implements Serializable {
for (String segmentUri : segmentUris) {
URI segmentURI = URI.create(segmentUri);
PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme());
+ AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
URI controllerURI;
try {
@@ -182,9 +186,8 @@ public class SegmentPushUtils implements Serializable {
try {
SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
.sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
segmentUri,
- HttpClient.makeAuthHeader(spec.getAuthToken()),
- FileUploadDownloadClient.makeTableParam(tableName),
- HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ AuthProviderUtils.toRequestHeaders(authProvider),
+ FileUploadDownloadClient.makeTableParam(tableName),
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
LOGGER.info("Response for pushing table {} segment uri {} to
location {} - {}: {}", tableName, segmentUri,
controllerURI, response.getStatusCode(),
response.getResponse());
return true;
@@ -237,6 +240,7 @@ public class SegmentPushUtils implements Serializable {
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
String segmentName = fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length());
File segmentMetadataFile = generateSegmentMetadataFile(fileSystem,
URI.create(tarFilePath));
+ AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
try {
for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
URI controllerURI;
@@ -260,7 +264,7 @@ public class SegmentPushUtils implements Serializable {
headers.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
segmentUriPath));
headers.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
- headers.addAll(HttpClient.makeAuthHeader(spec.getAuthToken()));
+ headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
segmentName,
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index e2454cdf47..4ee010ee3b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -64,7 +64,6 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,7 +85,6 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private HelixManager _helixManager;
private ServerMetrics _serverMetrics;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private String _authToken;
private SegmentUploader _segmentUploader;
// Fixed size LRU cache for storing last N errors on the instance.
@@ -103,7 +101,6 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
_instanceId = _instanceDataManagerConfig.getInstanceId();
_helixManager = helixManager;
_serverMetrics = serverMetrics;
- _authToken =
config.getProperty(CommonConstants.Server.CONFIG_OF_AUTH_TOKEN);
_segmentUploader = new
PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(),
PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
@@ -180,7 +177,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
LOGGER.info("Creating table data manager for table: {}",
tableNameWithType);
TableDataManagerConfig tableDataManagerConfig =
TableDataManagerConfig.getDefaultHelixTableDataManagerConfig(_instanceDataManagerConfig,
tableNameWithType);
- tableDataManagerConfig.overrideConfigs(tableConfig, _authToken);
+ tableDataManagerConfig.overrideConfigs(tableConfig);
TableDataManager tableDataManager =
TableDataManagerProvider.getTableDataManager(tableDataManagerConfig,
_instanceId, _propertyStore,
_serverMetrics, _helixManager, _errorCache);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index b9ea877404..0d7a2a213e 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -62,8 +62,6 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
public static final String SEGMENT_FORMAT_VERSION = "segment.format.version";
// Key of whether to enable reloading consuming segments
public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT =
"reload.consumingSegment";
- // Key of the auth token
- public static final String AUTH_TOKEN = "auth.token";
// Key of segment directory loader
public static final String SEGMENT_DIRECTORY_LOADER =
"segment.directory.loader";
@@ -224,12 +222,6 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS);
}
- @Override
- public String getAuthToken() {
- return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN);
- }
-
- @Override
public String getSegmentDirectoryLoader() {
return
_instanceDataManagerConfiguration.getProperty(SEGMENT_DIRECTORY_LOADER,
SegmentDirectoryLoaderRegistry.DEFAULT_SEGMENT_DIRECTORY_LOADER_NAME);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthProvider.java
similarity index 62%
rename from pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthProvider.java
index 5a9798c355..a646a33a71 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthProvider.java
@@ -18,17 +18,18 @@
*/
package org.apache.pinot.spi.auth;
+import java.util.Map;
+
+
/**
- * Container for all auth related info
+ * Pluggable auth provider interface to augment authentication information in
requests issued by pinot.
+ *
+ * Comes with several default implementation, including noop, static tokens,
and token loaded from external urls.
+ * The purpose of AuthProvider is enabling dynamic reconfiguration of pinot's
internal auth tokens, for example with
+ * expiring JWTs and other token rotation mechanisms.
*/
-public class AuthContext {
- private final String _authToken;
-
- public AuthContext(String authToken) {
- _authToken = authToken;
- }
+public interface AuthProvider {
+ Map<String, Object> getRequestHeaders();
- public String getAuthToken() {
- return _authToken;
- }
+ String getTaskToken();
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index baa29861ca..f003d441f4 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -55,8 +55,6 @@ public interface InstanceDataManagerConfig {
int getMaxParallelSegmentDownloads();
- String getAuthToken();
-
String getSegmentDirectoryLoader();
long getErrorCacheSize();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index 15fc9fbcd8..145f443a4a 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -124,6 +124,12 @@ public class SegmentGenerationJobSpec implements
Serializable {
/**
* Controller auth token
+ *
+ * <br/>NOTE: jobs MUST NOT allow references to external tokens via URL or
path to prevent:
+ * (a) file system crawling
+ * (b) unauthorized injection of system tokens from the server's local file
system.
+ *
+ * Instead, resolve tokens right when the job command is run. This allows
injection of client-local credentials.
*/
private String _authToken;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
index e723b4c987..e48cd63354 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
@@ -22,7 +22,7 @@ import java.net.URI;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.spi.annotations.InterfaceStability;
-import org.apache.pinot.spi.auth.AuthContext;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -49,17 +49,17 @@ public interface SegmentUploader {
/**
* Uploads the segment tar file to the cluster
* @param segmentTarFile URI of segment tar file
- * @param authContext auth details required to upload pinot segment to
controller
+ * @param authProvider auth provider
*/
- void uploadSegment(URI segmentTarFile, @Nullable AuthContext authContext)
+ void uploadSegment(URI segmentTarFile, @Nullable AuthProvider authProvider)
throws Exception;
/**
* Uploads the segments from the segmentDir to the cluster.
* Looks for segmentTar files recursively, with suffix .tar.gz
* @param segmentDir URI of directory containing segment tar files
- * @param authContext auth details required to upload pinot segment to
controller
+ * @param authProvider auth auth provider
*/
- void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext authContext)
+ void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthProvider
authProvider)
throws Exception;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2fbb323bd4..c0fc4ebf5d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -33,7 +33,7 @@ public class CommonConstants {
public static final String HTTP_PROTOCOL = "http";
public static final String HTTPS_PROTOCOL = "https";
- public static final String KEY_OF_AUTH_TOKEN = "auth.token";
+ public static final String KEY_OF_AUTH = "auth";
public static final String TABLE_NAME = "tableName";
@@ -337,7 +337,7 @@ public class CommonConstants {
* Service token for accessing protected controller APIs.
* E.g. null (auth disabled), "Basic abcdef..." (basic auth), "Bearer
123def..." (oauth2)
*/
- public static final String CONFIG_OF_AUTH_TOKEN = KEY_OF_AUTH_TOKEN;
+ public static final String CONFIG_OF_AUTH = KEY_OF_AUTH;
// Configuration to consider the server ServiceStatus as being STARTED if
the percent of resources (tables) that
// are ONLINE for this this server has crossed the threshold percentage of
the total number of tables
@@ -438,7 +438,7 @@ public class CommonConstants {
* Service token for accessing protected controller APIs.
* E.g. null (auth disabled), "Basic abcdef..." (basic auth), "Bearer
123def..." (oauth2)
*/
- public static final String CONFIG_OF_SEGMENT_UPLOADER_AUTH_TOKEN =
KEY_OF_AUTH_TOKEN;
+ public static final String CONFIG_OF_SEGMENT_UPLOADER_AUTH = KEY_OF_AUTH;
public static final int DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS =
300_000;
public static final int DEFAULT_OTHER_REQUESTS_TIMEOUT = 10_000;
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/AuthQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/AuthQuickstart.java
index 00ec73d5af..ff2ea7d047 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/AuthQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/AuthQuickstart.java
@@ -22,7 +22,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.core.auth.BasicAuthUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -33,8 +35,8 @@ public class AuthQuickstart extends Quickstart {
}
@Override
- public String getAuthToken() {
- return BasicAuthUtils.toBasicAuthToken("admin", "verysecret");
+ public AuthProvider getAuthProvider() {
+ return
AuthProviderUtils.makeAuthProvider(BasicAuthUtils.toBasicAuthToken("admin",
"verysecret"));
}
@Override
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index ebcdb377d2..c186e67cbc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -30,9 +30,11 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.minion.MinionClient;
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
@@ -53,12 +55,12 @@ public class BootstrapTableTool {
private final String _controllerProtocol;
private final String _controllerHost;
private final int _controllerPort;
- private final String _authToken;
+ private final AuthProvider _authProvider;
private final String _tableDir;
private final MinionClient _minionClient;
public BootstrapTableTool(String controllerProtocol, String controllerHost,
int controllerPort, String tableDir,
- String authToken) {
+ AuthProvider authProvider) {
Preconditions.checkNotNull(controllerProtocol);
Preconditions.checkNotNull(controllerHost);
Preconditions.checkNotNull(tableDir);
@@ -67,7 +69,7 @@ public class BootstrapTableTool {
_controllerPort = controllerPort;
_tableDir = tableDir;
_minionClient = new MinionClient(controllerHost,
String.valueOf(controllerPort));
- _authToken = authToken;
+ _authProvider = authProvider;
}
public boolean execute()
@@ -117,7 +119,7 @@ public class BootstrapTableTool {
return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath())
.setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerProtocol(_controllerProtocol)
.setControllerHost(_controllerHost).setControllerPort(String.valueOf(_controllerPort)).setExecute(true)
- .setAuthToken(_authToken).execute();
+ .setAuthProvider(_authProvider).execute();
}
private boolean bootstrapOfflineTable(File setupTableTmpDir, String
tableName, File schemaFile,
@@ -179,7 +181,8 @@ public class BootstrapTableTool {
tlsSpec.getTrustStorePassword());
}
- spec.setAuthToken(_authToken);
+ // url-based token needs to be resolved before job run
+ spec.setAuthToken(AuthProviderUtils.toStaticToken(_authProvider));
IngestionJobLauncher.runIngestionJob(spec);
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/EmptyQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/EmptyQuickstart.java
index d9830a657a..24c3fcc35e 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/EmptyQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/EmptyQuickstart.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
@@ -32,7 +33,7 @@ public class EmptyQuickstart extends QuickStartBase {
return Arrays.asList("EMPTY", "DEFAULT");
}
- public String getAuthToken() {
+ public AuthProvider getAuthProvider() {
return null;
}
@@ -48,7 +49,7 @@ public class EmptyQuickstart extends QuickStartBase {
QuickstartRunner runner =
new QuickstartRunner(new ArrayList<>(), 1, 1, 1, 1,
- dataDir, true, getAuthToken(), getConfigOverrides(),
_zkExternalAddress, false);
+ dataDir, true, getAuthProvider(), getConfigOverrides(),
_zkExternalAddress, false);
if (_zkExternalAddress != null) {
printStatus(Quickstart.Color.CYAN, "***** Starting controller, broker
and server *****");
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
index c3cd2b40ac..d55ab66427 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
@@ -56,7 +57,7 @@ public class Quickstart extends QuickStartBase {
}
}
- public String getAuthToken() {
+ public AuthProvider getAuthProvider() {
return null;
}
@@ -99,7 +100,7 @@ public class Quickstart extends QuickStartBase {
QuickstartTableRequest request = new
QuickstartTableRequest(baseDir.getAbsolutePath());
QuickstartRunner runner =
- new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir,
true, getAuthToken(),
+ new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir,
true, getAuthProvider(),
getConfigOverrides(), null, true);
printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and
server *****");
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
index e3b1000ad7..85da5d31a4 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java
@@ -35,8 +35,12 @@ import javax.annotation.Nullable;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
-import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.auth.NullAuthProvider;
+import org.apache.pinot.common.auth.StaticTokenAuthProvider;
+import org.apache.pinot.common.auth.UrlAuthProvider;
import org.apache.pinot.core.auth.BasicAuthUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.tools.AbstractBaseCommand;
import org.apache.pinot.tools.utils.PinotConfigUtils;
@@ -125,34 +129,44 @@ public class AbstractBaseAdminCommand extends
AbstractBaseCommand {
}
/**
- * Generate an (optional) HTTP Authorization header given an auth token
- * @see HttpClient#makeAuthHeader(String)
+ * Generate an (optional) HTTP Authorization header given an auth config
*
- * @param authToken auth token
- * @return list of 0 or 1 "Authorization" headers
+ * @param authProvider auth provider
+ * @return list of headers
*/
- static List<Header> makeAuthHeader(String authToken) {
- return HttpClient.makeAuthHeader(authToken);
+ static List<Header> makeAuthHeaders(AuthProvider authProvider) {
+ return AuthProviderUtils.toRequestHeaders(authProvider);
}
/**
* Generate auth token from pass-thru token or generate basic auth from
user/password pair
*
+ * @param provider optional provider
+ * @param tokenUrl optional token url
* @param authToken optional pass-thru token
* @param user optional username
* @param password optional password
- * @return auth token, or null if neither pass-thru token nor user info
available
+ * @return auth provider, or NullauthProvider if neither pass-thru token nor
user info available
*/
@Nullable
- static String makeAuthToken(String authToken, String user, String password) {
+ static AuthProvider makeAuthProvider(AuthProvider provider, String tokenUrl,
String authToken, String user,
+ String password) {
+ if (provider != null) {
+ return provider;
+ }
+
+ if (StringUtils.isNotBlank(tokenUrl)) {
+ return new UrlAuthProvider(tokenUrl);
+ }
+
if (StringUtils.isNotBlank(authToken)) {
- return authToken;
+ return new StaticTokenAuthProvider(authToken);
}
if (StringUtils.isNotBlank(user)) {
- return BasicAuthUtils.toBasicAuthToken(user, password);
+ return new StaticTokenAuthProvider(BasicAuthUtils.toBasicAuthToken(user,
password));
}
- return null;
+ return new NullAuthProvider();
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
index 83fe31eea5..18d1dfd6c4 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.util.Collections;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
@@ -59,10 +60,15 @@ public class AddSchemaCommand extends
AbstractBaseAdminCommand implements Comman
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required =
false, help = true,
description = "Print this message.")
private boolean _help = false;
+ private AuthProvider _authProvider;
+
@Override
public boolean getHelp() {
return _help;
@@ -119,8 +125,8 @@ public class AddSchemaCommand extends
AbstractBaseAdminCommand implements Comman
_password = password;
}
- public void setAuthToken(String authToken) {
- _authToken = authToken;
+ public void setAuthProvider(AuthProvider authProvider) {
+ _authProvider = authProvider;
}
public AddSchemaCommand setExecute(boolean exec) {
@@ -151,8 +157,8 @@ public class AddSchemaCommand extends
AbstractBaseAdminCommand implements Comman
try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
fileUploadDownloadClient.addSchema(FileUploadDownloadClient
.getUploadSchemaURI(_controllerProtocol, _controllerHost,
Integer.parseInt(_controllerPort)),
- schema.getSchemaName(), schemaFile,
makeAuthHeader(makeAuthToken(_authToken, _user, _password)),
- Collections.emptyList());
+ schema.getSchemaName(), schemaFile,
makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken,
+ _user, _password)), Collections.emptyList());
} catch (Exception e) {
LOGGER.error("Got Exception to upload Pinot Schema: " +
schema.getSchemaName(), e);
return false;
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
index ea423d972d..c4d149f176 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Collections;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -72,12 +73,17 @@ public class AddTableCommand extends
AbstractBaseAdminCommand implements Command
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required =
false, help = true,
description = "Print this message.")
private boolean _help = false;
private String _controllerAddress;
+ private AuthProvider _authProvider;
+
@Override
public boolean getHelp() {
return _help;
@@ -141,13 +147,13 @@ public class AddTableCommand extends
AbstractBaseAdminCommand implements Command
return this;
}
- public AddTableCommand setAuthToken(String authToken) {
- _authToken = authToken;
+ public AddTableCommand setExecute(boolean exec) {
+ _exec = exec;
return this;
}
- public AddTableCommand setExecute(boolean exec) {
- _exec = exec;
+ public AddTableCommand setAuthProvider(AuthProvider authProvider) {
+ _authProvider = authProvider;
return this;
}
@@ -165,8 +171,8 @@ public class AddTableCommand extends
AbstractBaseAdminCommand implements Command
try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
fileUploadDownloadClient.addSchema(FileUploadDownloadClient
.getUploadSchemaURI(_controllerProtocol, _controllerHost,
Integer.parseInt(_controllerPort)),
- schema.getSchemaName(), schemaFile,
makeAuthHeader(makeAuthToken(_authToken, _user, _password)),
- Collections.emptyList());
+ schema.getSchemaName(), schemaFile,
makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken,
+ _user, _password)), Collections.emptyList());
} catch (Exception e) {
LOGGER.error("Got Exception to upload Pinot Schema: " +
schema.getSchemaName(), e);
throw e;
@@ -177,7 +183,7 @@ public class AddTableCommand extends
AbstractBaseAdminCommand implements Command
throws IOException {
String res = AbstractBaseAdminCommand
.sendRequest("POST",
ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(),
node.toString(),
- makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
+ makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl,
_authToken, _user, _password)));
LOGGER.info(res);
return res.contains("succesfully added");
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
index 7001d38ca5..53203ad21f 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.admin.command;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -71,12 +72,17 @@ public class AddTenantCommand extends
AbstractBaseAdminCommand implements Comman
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required =
false, help = true,
description = "Print this message.")
private boolean _help = false;
private String _controllerAddress;
+ private AuthProvider _authProvider;
+
public AddTenantCommand setControllerUrl(String url) {
_controllerAddress = url;
return this;
@@ -117,13 +123,13 @@ public class AddTenantCommand extends
AbstractBaseAdminCommand implements Comman
return this;
}
- public AddTenantCommand setAuthToken(String authToken) {
- _authToken = authToken;
+ public AddTenantCommand setExecute(boolean exec) {
+ _exec = exec;
return this;
}
- public AddTenantCommand setExecute(boolean exec) {
- _exec = exec;
+ public AddTenantCommand setAuthProvider(AuthProvider authProvider) {
+ _authProvider = authProvider;
return this;
}
@@ -147,7 +153,8 @@ public class AddTenantCommand extends
AbstractBaseAdminCommand implements Comman
Tenant tenant = new Tenant(_role, _name, _instanceCount,
_offlineInstanceCount, _realtimeInstanceCount);
String res = AbstractBaseAdminCommand
.sendRequest("POST",
ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(),
- tenant.toJsonString(), makeAuthHeader(makeAuthToken(_authToken,
_user, _password)));
+ tenant.toJsonString(),
makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken,
_user,
+ _password)));
LOGGER.info(res);
System.out.print(res);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
index a6accd2b14..2e6b95b9c7 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.admin.command;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
@@ -87,10 +88,15 @@ public class BootstrapTableCommand extends
AbstractBaseAdminCommand implements C
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required =
false, help = true,
description = "Print this message.")
private boolean _help = false;
+ private AuthProvider _authProvider;
+
@Override
public boolean getHelp() {
return _help;
@@ -106,6 +112,11 @@ public class BootstrapTableCommand extends
AbstractBaseAdminCommand implements C
return this;
}
+ public BootstrapTableCommand setAuthProvider(AuthProvider authProvider) {
+ _authProvider = authProvider;
+ return this;
+ }
+
@Override
public String toString() {
return ("BootstrapTable -dir " + _dir);
@@ -127,8 +138,7 @@ public class BootstrapTableCommand extends
AbstractBaseAdminCommand implements C
if (_controllerHost == null) {
_controllerHost = NetUtils.getHostAddress();
}
- String token = makeAuthToken(_authToken, _user, _password);
- return new BootstrapTableTool(_controllerProtocol, _controllerHost,
Integer.parseInt(_controllerPort), _dir, token)
- .execute();
+ return new BootstrapTableTool(_controllerProtocol, _controllerHost,
Integer.parseInt(_controllerPort), _dir,
+ makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user,
_password)).execute();
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
index d098935b3a..855cda9704 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java
@@ -21,7 +21,7 @@ package org.apache.pinot.tools.admin.command;
import java.net.URI;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.tools.Command;
@@ -58,10 +58,20 @@ public class ChangeTableState extends
AbstractBaseAdminCommand implements Comman
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required =
false, help = true,
description = "Print this message.")
private boolean _help = false;
+ private AuthProvider _authProvider;
+
+ public ChangeTableState setAuthProvider(AuthProvider authProvider) {
+ _authProvider = authProvider;
+ return this;
+ }
+
@Override
public boolean execute()
throws Exception {
@@ -78,12 +88,10 @@ public class ChangeTableState extends
AbstractBaseAdminCommand implements Comman
URI uri = new URI(_controllerProtocol, null, _controllerHost,
Integer.parseInt(_controllerPort),
URI_TABLES_PATH + _tableName, "state=" + stateValue, null);
- String token = makeAuthToken(_authToken, _user, _password);
-
GetMethod httpGet = new GetMethod(uri.toString());
- if (StringUtils.isNotBlank(token)) {
- httpGet.setRequestHeader("Authorization", token);
- }
+ makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken,
_user, _password))
+ .forEach(header -> httpGet.addRequestHeader(header.getName(),
header.getValue()));
+
int status = httpClient.executeMethod(httpGet);
if (status != 200) {
throw new RuntimeException("Failed to change table state, error: " +
httpGet.getResponseBodyAsString());
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
index b5d907e7f7..696058ef22 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
@@ -85,6 +86,9 @@ public class ImportDataCommand extends
AbstractBaseAdminCommand implements Comma
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
@CommandLine.Option(names = {"-tempDir"},
description = "Temporary directory used to hold data during segment
creation.")
private String _tempDir = new File(FileUtils.getTempDirectory(),
getClass().getSimpleName()).getAbsolutePath();
@@ -96,6 +100,8 @@ public class ImportDataCommand extends
AbstractBaseAdminCommand implements Comma
@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true,
description = "Print this message.")
private boolean _help = false;
+ private AuthProvider _authProvider;
+
public ImportDataCommand setDataFilePath(String dataFilePath) {
_dataFilePath = dataFilePath;
return this;
@@ -143,8 +149,8 @@ public class ImportDataCommand extends
AbstractBaseAdminCommand implements Comma
return this;
}
- public ImportDataCommand setAuthToken(String authToken) {
- _authToken = authToken;
+ public ImportDataCommand setAuthProvider(AuthProvider authProvider) {
+ _authProvider = authProvider;
return this;
}
@@ -253,7 +259,7 @@ public class ImportDataCommand extends
AbstractBaseAdminCommand implements Comma
spec.setCleanUpOutputDir(true);
spec.setOverwriteOutput(true);
spec.setJobType("SegmentCreationAndTarPush");
- spec.setAuthToken(makeAuthToken(_authToken, _user, _password));
+ spec.setAuthToken(makeAuthProvider(_authProvider, _authTokenUrl,
_authToken, _user, _password).getTaskToken());
// set ExecutionFrameworkSpec
ExecutionFrameworkSpec executionFrameworkSpec = new
ExecutionFrameworkSpec();
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
index 48c58a3949..c21f312104 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TlsSpec;
@@ -58,6 +59,10 @@ public class LaunchDataIngestionJobCommand extends
AbstractBaseAdminCommand impl
private String _password;
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
+ private AuthProvider _authProvider;
public String getJobSpecFile() {
return _jobSpecFile;
@@ -83,6 +88,10 @@ public class LaunchDataIngestionJobCommand extends
AbstractBaseAdminCommand impl
_propertyFile = propertyFile;
}
+ public void setAuthProvider(AuthProvider authProvider) {
+ _authProvider = authProvider;
+ }
+
@Override
public boolean getHelp() {
return _help;
@@ -114,7 +123,7 @@ public class LaunchDataIngestionJobCommand extends
AbstractBaseAdminCommand impl
}
if (StringUtils.isBlank(spec.getAuthToken())) {
- spec.setAuthToken(makeAuthToken(_authToken, _user, _password));
+ spec.setAuthToken(makeAuthProvider(_authProvider, _authToken, _user,
_password, _authTokenUrl).getTaskToken());
}
try {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
index 35fe6dbd27..37868d85d5 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.http.Header;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.NetUtils;
@@ -55,6 +56,9 @@ public class OperateClusterConfigCommand extends
AbstractBaseAdminCommand implem
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
@CommandLine.Option(names = {"-config"}, description = "Cluster config to
operate.")
private String _config;
@@ -66,6 +70,8 @@ public class OperateClusterConfigCommand extends
AbstractBaseAdminCommand implem
description = "Print this message.")
private boolean _help = false;
+ private AuthProvider _authProvider;
+
@Override
public boolean getHelp() {
return _help;
@@ -122,11 +128,6 @@ public class OperateClusterConfigCommand extends
AbstractBaseAdminCommand implem
return this;
}
- public OperateClusterConfigCommand setAuthToken(String authToken) {
- _authToken = authToken;
- return this;
- }
-
public OperateClusterConfigCommand setConfig(String config) {
_config = config;
return this;
@@ -137,6 +138,11 @@ public class OperateClusterConfigCommand extends
AbstractBaseAdminCommand implem
return this;
}
+ public OperateClusterConfigCommand setAuthProvider(AuthProvider
authProvider) {
+ _authProvider = authProvider;
+ return this;
+ }
+
public String run()
throws Exception {
if (_controllerHost == null) {
@@ -148,7 +154,8 @@ public class OperateClusterConfigCommand extends
AbstractBaseAdminCommand implem
}
String clusterConfigUrl =
_controllerProtocol + "://" + _controllerHost + ":" + _controllerPort
+ "/cluster/configs";
- List<Header> headers = makeAuthHeader(makeAuthToken(_authToken, _user,
_password));
+ List<Header> headers = makeAuthHeaders(makeAuthProvider(_authProvider,
_authTokenUrl, _authToken, _user,
+ _password));
switch (_operation.toUpperCase()) {
case "ADD":
case "UPDATE":
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
index 60c3c02d41..f004b5f367 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
@@ -19,6 +19,7 @@
package org.apache.pinot.tools.admin.command;
import java.util.Collections;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -54,10 +55,15 @@ public class PostQueryCommand extends
AbstractBaseAdminCommand implements Comman
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required =
false, help = true, description = "Print "
+ "this message.")
private boolean _help = false;
+ private AuthProvider _authProvider;
+
@Override
public boolean getHelp() {
return _help;
@@ -108,13 +114,13 @@ public class PostQueryCommand extends
AbstractBaseAdminCommand implements Comman
return this;
}
- public PostQueryCommand setAuthToken(String authToken) {
- _authToken = authToken;
+ public PostQueryCommand setQuery(String query) {
+ _query = query;
return this;
}
- public PostQueryCommand setQuery(String query) {
- _query = query;
+ public PostQueryCommand setAuthProvider(AuthProvider authProvider) {
+ _authProvider = authProvider;
return this;
}
@@ -126,7 +132,8 @@ public class PostQueryCommand extends
AbstractBaseAdminCommand implements Comman
LOGGER.info("Executing command: " + this);
String url = _brokerProtocol + "://" + _brokerHost + ":" + _brokerPort +
"/query/sql";
String request =
JsonUtils.objectToString(Collections.singletonMap(Request.SQL, _query));
- return sendRequest("POST", url, request,
makeAuthHeader(makeAuthToken(_authToken, _user, _password)));
+ return sendRequest("POST", url, request,
makeAuthHeaders(makeAuthProvider(_authProvider,
+ _authTokenUrl, _authToken, _user, _password)));
}
@Override
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index 1f479ed46a..f2d7686873 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -63,7 +64,7 @@ public class QuickstartRunner {
private final int _numMinions;
private final File _tempDir;
private final boolean _enableTenantIsolation;
- private final String _authToken;
+ private final AuthProvider _authProvider;
private final Map<String, Object> _configOverrides;
private final boolean _deleteExistingData;
@@ -82,7 +83,7 @@ public class QuickstartRunner {
}
public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int
numControllers, int numBrokers,
- int numServers, int numMinions, File tempDir, boolean enableIsolation,
String authToken,
+ int numServers, int numMinions, File tempDir, boolean enableIsolation,
AuthProvider authProvider,
Map<String, Object> configOverrides, String zkExternalAddress, boolean
deleteExistingData)
throws Exception {
_tableRequests = tableRequests;
@@ -92,7 +93,7 @@ public class QuickstartRunner {
_numMinions = numMinions;
_tempDir = tempDir;
_enableTenantIsolation = enableIsolation;
- _authToken = authToken;
+ _authProvider = authProvider;
_configOverrides = configOverrides;
_zkExternalAddress = zkExternalAddress;
_deleteExistingData = deleteExistingData;
@@ -218,8 +219,8 @@ public class QuickstartRunner {
public void bootstrapTable()
throws Exception {
for (QuickstartTableRequest request : _tableRequests) {
- if (!new BootstrapTableTool("http", "localhost",
_controllerPorts.get(0), request.getBootstrapTableDir(),
- _authToken).execute()) {
+ if (!new BootstrapTableTool("http", "localhost", _controllerPorts.get(0),
+ request.getBootstrapTableDir(), _authProvider).execute()) {
throw new RuntimeException("Failed to bootstrap table with request - "
+ request);
}
}
@@ -229,8 +230,8 @@ public class QuickstartRunner {
throws Exception {
int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size()));
return JsonUtils.stringToJsonNode(
- new
PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setAuthToken(_authToken).setQuery(query)
- .run());
+ new
PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setAuthProvider(_authProvider)
+ .setQuery(query).run());
}
public static void registerDefaultPinotFS() {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
index 90957dcc91..535bc1e958 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
@@ -27,6 +27,7 @@ import org.apache.http.message.BasicNameValuePair;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.tools.Command;
@@ -63,6 +64,9 @@ public class UploadSegmentCommand extends
AbstractBaseAdminCommand implements Co
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
private String _authToken;
+ @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
+ private String _authTokenUrl;
+
@CommandLine.Option(names = {"-segmentDir"}, required = true, description =
"Path to segment directory.")
private String _segmentDir = null;
@@ -74,6 +78,8 @@ public class UploadSegmentCommand extends
AbstractBaseAdminCommand implements Co
description = "Print this message.")
private boolean _help = false;
+ private AuthProvider _authProvider;
+
@Override
public boolean getHelp() {
return _help;
@@ -124,13 +130,13 @@ public class UploadSegmentCommand extends
AbstractBaseAdminCommand implements Co
return this;
}
- public UploadSegmentCommand setAuthToken(String authToken) {
- _authToken = authToken;
+ public UploadSegmentCommand setSegmentDir(String segmentDir) {
+ _segmentDir = segmentDir;
return this;
}
- public UploadSegmentCommand setSegmentDir(String segmentDir) {
- _segmentDir = segmentDir;
+ public UploadSegmentCommand setAuthProvider(AuthProvider authProvider) {
+ _authProvider = authProvider;
return this;
}
@@ -168,9 +174,9 @@ public class UploadSegmentCommand extends
AbstractBaseAdminCommand implements Co
LOGGER.info("Uploading segment tar file: {}", segmentTarFile);
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(), segmentTarFile,
- makeAuthHeader(makeAuthToken(_authToken, _user, _password)),
Collections
- .singletonList(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
_tableName)),
- HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl,
_authToken, _user, _password)),
+ Collections.singletonList(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
+ _tableName)), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
}
} finally {
// Delete the temporary working directory.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]