This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 40a3152 Add segment encryption on Controller based on table config
(#5617)
40a3152 is described below
commit 40a31529213efa7a5d03eb4e0ca020aa4045e0ca
Author: Sajjad <[email protected]>
AuthorDate: Wed Jul 1 13:23:24 2020 -0700
Add segment encryption on Controller based on table config (#5617)
* Add segment encryption on Controller based on table config
* Applied PR's comments
* Addressed PR's comments
* Add license header + equality check on crypters
* Addressed PR's comments
* Fix log message
Co-authored-by: Subbu Subramaniam <[email protected]>
Co-authored-by: Subbu Subramaniam <[email protected]>
---
.../common/utils/config/TableConfigUtils.java | 10 +-
.../pinot/common/utils/helix/TableCache.java | 4 +
.../common/utils/config/TableConfigSerDeTest.java | 6 +-
.../PinotSegmentUploadDownloadRestletResource.java | 122 ++++++++++++--------
.../pinot/controller/api/upload/ZKOperator.java | 11 +-
.../helix/core/PinotHelixResourceManager.java | 12 ++
...otSegmentUploadDownloadRestletResourceTest.java | 125 +++++++++++++++++++++
.../controller/api/upload/ZKOperatorTest.java | 11 +-
.../SegmentsValidationAndRetentionConfig.java | 13 ++-
.../apache/pinot/spi/crypt/NoOpPinotCrypter.java | 16 ++-
.../spi/utils/builder/TableConfigBuilder.java | 7 ++
11 files changed, 270 insertions(+), 67 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 45610fa..2ee7b47 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -132,8 +132,8 @@ public class TableConfigUtils {
}
return new TableConfig(tableName, tableType, validationConfig,
tenantConfig, indexingConfig, customConfig,
- quotaConfig, taskConfig, routingConfig, queryConfig,
instanceAssignmentConfigMap, fieldConfigList,
- upsertConfig, ingestionConfig);
+ quotaConfig, taskConfig, routingConfig, queryConfig,
instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
+ ingestionConfig);
}
public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -228,8 +228,10 @@ public class TableConfigUtils {
}
String peerSegmentDownloadScheme =
validationConfig.getPeerSegmentDownloadScheme();
if (peerSegmentDownloadScheme != null) {
- if (!"http".equalsIgnoreCase(peerSegmentDownloadScheme) &&
!"https".equalsIgnoreCase(peerSegmentDownloadScheme)) {
- throw new IllegalStateException("Invalid value '" +
peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of
http nor https" );
+ if (!"http".equalsIgnoreCase(peerSegmentDownloadScheme) && !"https"
+ .equalsIgnoreCase(peerSegmentDownloadScheme)) {
+ throw new IllegalStateException("Invalid value '" +
peerSegmentDownloadScheme
+ + "' for peerSegmentDownloadScheme. Must be one of http nor
https");
}
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 474adbf..6f46ac2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -80,6 +80,10 @@ public class TableCache {
return columnName;
}
+ public TableConfig getTableConfig(String tableName) {
+ return _tableConfigChangeListener._tableConfigMap.get(tableName);
+ }
+
class TableConfigChangeListener implements IZkChildListener, IZkDataListener
{
Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index af5e5c0..f877c3f 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -248,8 +248,10 @@ public class TableConfigSerDeTest {
{
// with SegmentsValidationAndRetentionConfig
TableConfig tableConfig =
tableConfigBuilder.setPeerSegmentDownloadScheme("http").build();
-
checkSegmentsValidationAndRetentionConfig(JsonUtils.stringToObject(tableConfig.toJsonString(),
TableConfig.class));
-
checkSegmentsValidationAndRetentionConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
+ checkSegmentsValidationAndRetentionConfig(
+ JsonUtils.stringToObject(tableConfig.toJsonString(),
TableConfig.class));
+ checkSegmentsValidationAndRetentionConfig(
+
TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
}
{
// With ingestion config
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 576390d..1c9500e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.api.resources;
+import com.google.common.base.Strings;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -54,6 +55,8 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
@@ -71,7 +74,6 @@ import
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.metadata.DefaultMetadataExtractor;
import org.apache.pinot.core.metadata.MetadataExtractorFactory;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
-import org.apache.pinot.spi.crypt.NoOpPinotCrypter;
import org.apache.pinot.spi.crypt.PinotCrypter;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.filesystem.PinotFS;
@@ -177,13 +179,13 @@ public class PinotSegmentUploadDownloadRestletResource {
private SuccessResponse uploadSegment(@Nullable String tableName,
FormDataMultiPart multiPart,
boolean enableParallelPushProtection, HttpHeaders headers, Request
request, boolean moveSegmentToFinalLocation) {
String uploadTypeStr = null;
- String crypterClassName = null;
+ String crypterClassNameInHeader = null;
String downloadUri = null;
if (headers != null) {
extractHttpHeader(headers,
CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER);
extractHttpHeader(headers,
CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);
uploadTypeStr = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
- crypterClassName = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+ crypterClassNameInHeader = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.CRYPTER);
downloadUri = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
}
@@ -194,38 +196,31 @@ public class PinotSegmentUploadDownloadRestletResource {
ControllerFilePathProvider provider =
ControllerFilePathProvider.getInstance();
String tempFileName = TMP_DIR_PREFIX + System.nanoTime();
tempDecryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName);
+ tempEncryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName + ENCRYPTED_SUFFIX);
tempSegmentDir = new File(provider.getUntarredFileTempDir(),
tempFileName);
- // Set default crypter to the noop crypter when no crypter header is sent
- // In this case, the noop crypter will not do any operations, so the
encrypted and decrypted file will have the same
- // file path.
- if (crypterClassName == null) {
- crypterClassName = NoOpPinotCrypter.class.getSimpleName();
- tempEncryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName);
- } else {
- tempEncryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName + ENCRYPTED_SUFFIX);
- }
-
- // TODO: Change when metadata upload added
- String metadataProviderClass = DefaultMetadataExtractor.class.getName();
+ boolean uploadedSegmentIsEncrypted =
!Strings.isNullOrEmpty(crypterClassNameInHeader);
- SegmentMetadata segmentMetadata;
+ File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile :
tempDecryptedFile;
FileUploadDownloadClient.FileUploadType uploadType =
getUploadType(uploadTypeStr);
switch (uploadType) {
case URI:
- segmentMetadata =
- getMetadataForURI(crypterClassName, downloadUri,
tempEncryptedFile, tempDecryptedFile, tempSegmentDir,
- metadataProviderClass);
+ downloadSegmentFileFromURI(downloadUri, dstFile, tableName);
break;
case SEGMENT:
- getFileFromMultipart(multiPart, tempEncryptedFile);
- segmentMetadata = getSegmentMetadata(crypterClassName,
tempEncryptedFile, tempDecryptedFile, tempSegmentDir,
- metadataProviderClass);
+ createSegmentFileFromMultipart(multiPart, dstFile);
break;
default:
throw new UnsupportedOperationException("Unsupported upload type: "
+ uploadType);
}
+ if (uploadedSegmentIsEncrypted) {
+ decryptFile(crypterClassNameInHeader, tempEncryptedFile,
tempDecryptedFile);
+ }
+
+ String metadataProviderClass = DefaultMetadataExtractor.class.getName();
+ SegmentMetadata segmentMetadata = getSegmentMetadata(tempDecryptedFile,
tempSegmentDir, metadataProviderClass);
+
// Fetch segment name
String segmentName = segmentMetadata.getName();
@@ -251,6 +246,17 @@ public class PinotSegmentUploadDownloadRestletResource {
_controllerMetrics,
_leadControllerManager.isLeaderForTable(offlineTableName))
.validateOfflineSegment(offlineTableName, segmentMetadata,
tempSegmentDir);
+ // Encrypt segment
+ String crypterClassNameInTableConfig =
+
_pinotHelixResourceManager.getCrypterClassNameFromTableConfig(offlineTableName);
+ Pair<String, File> encryptionInfo =
+ encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile,
uploadedSegmentIsEncrypted,
+ crypterClassNameInHeader, crypterClassNameInTableConfig,
segmentName, tableName);
+
+ String crypterClassName = encryptionInfo.getLeft();
+ File finalSegmentFile = encryptionInfo.getRight();
+
+ // ZK download URI
String zkDownloadUri;
// This boolean is here for V1 segment upload, where we keep the segment
in the downloadURI sent in the header.
// We will deprecate this behavior eventually.
@@ -264,8 +270,8 @@ public class PinotSegmentUploadDownloadRestletResource {
}
// Zk operations
- completeZkOperations(enableParallelPushProtection, headers,
tempEncryptedFile, rawTableName, segmentMetadata,
- segmentName, zkDownloadUri, moveSegmentToFinalLocation);
+ completeZkOperations(enableParallelPushProtection, headers,
finalSegmentFile, rawTableName, segmentMetadata,
+ segmentName, zkDownloadUri, moveSegmentToFinalLocation,
crypterClassName);
return new SuccessResponse("Successfully uploaded segment: " +
segmentName + " of table: " + rawTableName);
} catch (WebApplicationException e) {
@@ -290,6 +296,39 @@ public class PinotSegmentUploadDownloadRestletResource {
return value;
}
+ Pair<String, File> encryptSegmentIfNeeded(File tempDecryptedFile, File
tempEncryptedFile,
+ boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment,
String crypterClassNameInTableConfig,
+ String segmentName, String tableName) {
+
+ boolean segmentNeedsEncryption =
!Strings.isNullOrEmpty(crypterClassNameInTableConfig);
+
+ // form the output
+ File finalSegmentFile =
+ (isUploadedSegmentEncrypted || segmentNeedsEncryption) ?
tempEncryptedFile : tempDecryptedFile;
+ String crypterClassName =
Strings.isNullOrEmpty(crypterClassNameInTableConfig) ?
crypterUsedInUploadedSegment
+ : crypterClassNameInTableConfig;
+ ImmutablePair<String, File> out = ImmutablePair.of(crypterClassName,
finalSegmentFile);
+
+ if (!segmentNeedsEncryption) {
+ return out;
+ }
+
+ if (isUploadedSegmentEncrypted &&
!crypterClassNameInTableConfig.equals(crypterUsedInUploadedSegment)) {
+ throw new ControllerApplicationException(LOGGER, String.format(
+ "Uploaded segment is encrypted with '%s' while table config requires
'%s' as crypter "
+ + "(segment name = '%s', table name = '%s').",
crypterUsedInUploadedSegment,
+ crypterClassNameInTableConfig, segmentName, tableName),
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+
+ // encrypt segment
+ PinotCrypter pinotCrypter =
PinotCrypterFactory.create(crypterClassNameInTableConfig);
+ LOGGER.info("Using crypter class '{}' for encrypting '{}' to '{}' (segment
name = '{}', table name = '{}').",
+ crypterClassNameInTableConfig, tempDecryptedFile, tempEncryptedFile,
segmentName, tableName);
+ pinotCrypter.encrypt(tempDecryptedFile, tempEncryptedFile);
+
+ return out;
+ }
+
private String getZkDownloadURIForSegmentUpload(String rawTableName, String
segmentName) {
ControllerFilePathProvider provider =
ControllerFilePathProvider.getInstance();
URI dataDirURI = provider.getDataDirURI();
@@ -303,46 +342,39 @@ public class PinotSegmentUploadDownloadRestletResource {
}
}
- private SegmentMetadata getMetadataForURI(String crypterClassHeader, String
currentSegmentLocationURI,
- File tempEncryptedFile, File tempDecryptedFile, File tempSegmentDir,
String metadataProviderClass)
+ private void downloadSegmentFileFromURI(String currentSegmentLocationURI,
File destFile, String tableName)
throws Exception {
- SegmentMetadata segmentMetadata;
if (currentSegmentLocationURI == null ||
currentSegmentLocationURI.isEmpty()) {
throw new ControllerApplicationException(LOGGER, "Failed to get
downloadURI, needed for URI upload",
Response.Status.BAD_REQUEST);
}
- LOGGER.info("Downloading segment from {} to {}",
currentSegmentLocationURI, tempEncryptedFile.getAbsolutePath());
- SegmentFetcherFactory.fetchSegmentToLocal(currentSegmentLocationURI,
tempEncryptedFile);
- segmentMetadata = getSegmentMetadata(crypterClassHeader,
tempEncryptedFile, tempDecryptedFile, tempSegmentDir,
- metadataProviderClass);
- return segmentMetadata;
+ LOGGER.info("Downloading segment from {} to {} for table {}",
currentSegmentLocationURI, destFile.getAbsolutePath(),
+ tableName);
+ SegmentFetcherFactory.fetchSegmentToLocal(currentSegmentLocationURI,
destFile);
}
- private SegmentMetadata getSegmentMetadata(String crypterClassHeader, File
tempEncryptedFile, File tempDecryptedFile,
- File tempSegmentDir, String metadataProviderClass)
+ private SegmentMetadata getSegmentMetadata(File tempDecryptedFile, File
tempSegmentDir, String metadataProviderClass)
throws Exception {
-
- decryptFile(crypterClassHeader, tempEncryptedFile, tempDecryptedFile);
-
// Call metadata provider to extract metadata with file object uri
return
MetadataExtractorFactory.create(metadataProviderClass).extractMetadata(tempDecryptedFile,
tempSegmentDir);
}
- private void completeZkOperations(boolean enableParallelPushProtection,
HttpHeaders headers, File tempEncryptedFile,
+ private void completeZkOperations(boolean enableParallelPushProtection,
HttpHeaders headers, File uploadedSegmentFile,
String rawTableName, SegmentMetadata segmentMetadata, String
segmentName, String zkDownloadURI,
- boolean moveSegmentToFinalLocation)
+ boolean moveSegmentToFinalLocation, String crypter)
throws Exception {
URI finalSegmentLocationURI = URIUtils
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
rawTableName,
URIUtils.encode(segmentName));
ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager,
_controllerConf, _controllerMetrics);
- zkOperator.completeSegmentOperations(rawTableName, segmentMetadata,
finalSegmentLocationURI, tempEncryptedFile,
- enableParallelPushProtection, headers, zkDownloadURI,
moveSegmentToFinalLocation);
+ zkOperator.completeSegmentOperations(rawTableName, segmentMetadata,
finalSegmentLocationURI, uploadedSegmentFile,
+ enableParallelPushProtection, headers, zkDownloadURI,
moveSegmentToFinalLocation, crypter);
}
- private void decryptFile(String crypterClassHeader, File tempEncryptedFile,
File tempDecryptedFile) {
- PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassHeader);
- LOGGER.info("Using crypter class {}", pinotCrypter.getClass().getName());
+ private void decryptFile(String crypterClassName, File tempEncryptedFile,
File tempDecryptedFile) {
+ PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassName);
+ LOGGER.info("Using crypter class {} for decrypting {} to {}",
pinotCrypter.getClass().getName(), tempEncryptedFile,
+ tempDecryptedFile);
pinotCrypter.decrypt(tempEncryptedFile, tempDecryptedFile);
}
@@ -422,7 +454,7 @@ public class PinotSegmentUploadDownloadRestletResource {
}
}
- private File getFileFromMultipart(FormDataMultiPart multiPart, File dstFile)
+ private File createSegmentFileFromMultipart(FormDataMultiPart multiPart,
File dstFile)
throws IOException {
// Read segment file or segment metadata file and directly use that
information to update zk
Map<String, List<FormDataBodyPart>> segmentMetadataMap =
multiPart.getFields();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index b33161b..fc31c1c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -59,7 +59,7 @@ public class ZKOperator {
public void completeSegmentOperations(String rawTableName, SegmentMetadata
segmentMetadata,
URI finalSegmentLocationURI, File currentSegmentLocation, boolean
enableParallelPushProtection,
- HttpHeaders headers, String zkDownloadURI, boolean
moveSegmentToFinalLocation)
+ HttpHeaders headers, String zkDownloadURI, boolean
moveSegmentToFinalLocation, String crypter)
throws Exception {
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
String segmentName = segmentMetadata.getName();
@@ -69,7 +69,6 @@ public class ZKOperator {
_pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName,
segmentName);
if (segmentMetadataZnRecord == null) {
LOGGER.info("Adding new segment {} from table {}", segmentName,
rawTableName);
- String crypter =
headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER);
processNewSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation, zkDownloadURI, crypter,
rawTableName, segmentName, moveSegmentToFinalLocation);
return;
@@ -78,13 +77,14 @@ public class ZKOperator {
LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, rawTableName);
processExistingSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation,
- enableParallelPushProtection, headers, zkDownloadURI,
offlineTableName, segmentName, segmentMetadataZnRecord,
- moveSegmentToFinalLocation);
+ enableParallelPushProtection, headers, zkDownloadURI, crypter,
offlineTableName, segmentName,
+ segmentMetadataZnRecord, moveSegmentToFinalLocation);
}
private void processExistingSegment(SegmentMetadata segmentMetadata, URI
finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI,
- String offlineTableName, String segmentName, ZNRecord znRecord, boolean
moveSegmentToFinalLocation)
+ String crypter, String offlineTableName, String segmentName, ZNRecord
znRecord,
+ boolean moveSegmentToFinalLocation)
throws Exception {
OfflineSegmentZKMetadata existingSegmentZKMetadata = new
OfflineSegmentZKMetadata(znRecord);
@@ -170,7 +170,6 @@ public class ZKOperator {
zkDownloadURI);
}
- String crypter =
headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER);
_pinotHelixResourceManager
.refreshSegment(offlineTableName, segmentMetadata,
existingSegmentZKMetadata, zkDownloadURI, crypter);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a49c5e8..7abe59a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -458,6 +458,18 @@ public class PinotHelixResourceManager {
public String getActualColumnName(String tableName, String columnName) {
return _tableCache.getActualColumnName(tableName, columnName);
}
+
+ /**
+ * Given a table name in any case, returns crypter class name defined in
table config
+ * @param tableName table name in any case
+ * @return crypter class name
+ */
+ public String getCrypterClassNameFromTableConfig(String tableName) {
+ TableConfig tableConfig = _tableCache.getTableConfig(tableName);
+ Preconditions.checkNotNull(tableConfig, "Table config is not available for
table '%s'", tableName);
+ return tableConfig.getValidationConfig().getCrypterClassName();
+ }
+
/**
* Table related APIs
*/
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java
new file mode 100644
index 0000000..ff497e9
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.io.File;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.crypt.NoOpPinotCrypter;
+import org.apache.pinot.spi.crypt.PinotCrypterFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class PinotSegmentUploadDownloadRestletResourceTest {
+
+ private static final String TABLE_NAME = "table_abc";
+ private static final String SEGMENT_NAME = "segment_xyz";
+
+ private PinotSegmentUploadDownloadRestletResource _resource = new
PinotSegmentUploadDownloadRestletResource();
+ private File _encryptedFile;
+ private File _decryptedFile;
+
+ @BeforeClass
+ public void setup()
+ throws Exception {
+
+ // create temp files
+ _encryptedFile = File.createTempFile("segment", ".enc");
+ _decryptedFile = File.createTempFile("segment", ".dec");
+ _encryptedFile.deleteOnExit();
+ _decryptedFile.deleteOnExit();
+
+ // configure pinot crypter
+ Configuration conf = new PropertiesConfiguration();
+ conf.addProperty("class.nooppinotcrypter",
NoOpPinotCrypter.class.getName());
+ PinotCrypterFactory.init(conf);
+ }
+
+ @Test
+ public void testEncryptSegmentIfNeeded_crypterInTableConfig() {
+
+ // arrange
+ boolean uploadedSegmentIsEncrypted = false;
+ String crypterClassNameInTableConfig = "NoOpPinotCrypter";
+ String crypterClassNameUsedInUploadedSegment = null;
+
+ // act
+ Pair<String, File> encryptionInfo = _resource
+ .encryptSegmentIfNeeded(_decryptedFile, _encryptedFile,
uploadedSegmentIsEncrypted,
+ crypterClassNameUsedInUploadedSegment,
crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME);
+
+ // assert
+ assertEquals("NoOpPinotCrypter", encryptionInfo.getLeft());
+ assertEquals(_encryptedFile, encryptionInfo.getRight());
+ }
+
+ @Test
+ public void testEncryptSegmentIfNeeded_uploadedSegmentIsEncrypted() {
+
+ // arrange
+ boolean uploadedSegmentIsEncrypted = true;
+ String crypterClassNameInTableConfig = "NoOpPinotCrypter";
+ String crypterClassNameUsedInUploadedSegment = "NoOpPinotCrypter";
+
+ // act
+ Pair<String, File> encryptionInfo = _resource
+ .encryptSegmentIfNeeded(_decryptedFile, _encryptedFile,
uploadedSegmentIsEncrypted,
+ crypterClassNameUsedInUploadedSegment,
crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME);
+
+ // assert
+ assertEquals("NoOpPinotCrypter", encryptionInfo.getLeft());
+ assertEquals(_encryptedFile, encryptionInfo.getRight());
+ }
+
+ @Test(expectedExceptions = ControllerApplicationException.class,
expectedExceptionsMessageRegExp = "Uploaded segment"
+ + " is encrypted with 'FancyCrypter' while table config requires
'NoOpPinotCrypter' as crypter .*")
+ public void testEncryptSegmentIfNeeded_differentCrypters() {
+
+ // arrange
+ boolean uploadedSegmentIsEncrypted = true;
+ String crypterClassNameInTableConfig = "NoOpPinotCrypter";
+ String crypterClassNameUsedInUploadedSegment = "FancyCrypter";
+
+ // act
+ _resource.encryptSegmentIfNeeded(_decryptedFile, _encryptedFile,
uploadedSegmentIsEncrypted,
+ crypterClassNameUsedInUploadedSegment, crypterClassNameInTableConfig,
SEGMENT_NAME, TABLE_NAME);
+ }
+
+ @Test
+ public void testEncryptSegmentIfNeeded_noEncryption() {
+
+ // arrange
+ boolean uploadedSegmentIsEncrypted = false;
+ String crypterClassNameInTableConfig = null;
+ String crypterClassNameUsedInUploadedSegment = null;
+
+ // act
+ Pair<String, File> encryptionInfo = _resource
+ .encryptSegmentIfNeeded(_decryptedFile, _encryptedFile,
uploadedSegmentIsEncrypted,
+ crypterClassNameUsedInUploadedSegment,
crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME);
+
+ // assert
+ assertNull(encryptionInfo.getLeft());
+ assertEquals(_decryptedFile, encryptionInfo.getRight());
+ }
+}
\ No newline at end of file
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 2b52497..bda1f09 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -66,9 +66,8 @@ public class ZKOperatorTest extends ControllerTest {
when(segmentMetadata.getCrc()).thenReturn("12345");
when(segmentMetadata.getIndexCreationTime()).thenReturn(123L);
HttpHeaders httpHeaders = mock(HttpHeaders.class);
-
when(httpHeaders.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER)).thenReturn("crypter");
zkOperator.completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata,
null, null, false, httpHeaders, "downloadUrl",
- false);
+ false, "crypter");
OfflineSegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME,
SEGMENT_NAME);
@@ -84,7 +83,7 @@ public class ZKOperatorTest extends ControllerTest {
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
try {
zkOperator.completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata,
null, null, false, httpHeaders,
- "otherDownloadUrl", false);
+ "otherDownloadUrl", false, null);
fail();
} catch (Exception e) {
// Expected
@@ -94,10 +93,9 @@ public class ZKOperatorTest extends ControllerTest {
// downloadURL and crypter
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
-
when(httpHeaders.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER)).thenReturn("otherCrypter");
zkOperator
.completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, null,
null, false, httpHeaders, "otherDownloadUrl",
- false);
+ false, "otherCrypter");
segmentZKMetadata =
_helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, SEGMENT_NAME);
assertEquals(segmentZKMetadata.getCrc(), 12345L);
// Push time should not change
@@ -113,12 +111,11 @@ public class ZKOperatorTest extends ControllerTest {
// Refresh the segment with a different segment (different CRC)
when(segmentMetadata.getCrc()).thenReturn("23456");
when(segmentMetadata.getIndexCreationTime()).thenReturn(789L);
-
when(httpHeaders.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER)).thenReturn("otherCrypter");
// Add a tiny sleep to guarantee that refresh time is different from the
previous round
Thread.sleep(10L);
zkOperator
.completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, null,
null, false, httpHeaders, "otherDownloadUrl",
- false);
+ false, "otherCrypter");
segmentZKMetadata =
_helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, SEGMENT_NAME);
assertEquals(segmentZKMetadata.getCrc(), 23456L);
// Push time should not change
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 30451ab..8bc1567 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -39,6 +39,7 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
private String _segmentAssignmentStrategy;
private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig;
private CompletionConfig _completionConfig;
+ private String _crypterClassName;
// Possible values can be http or https. If this field is set, a Pinot
server can download segments from peer servers
// using the specified download scheme. Both realtime tables and offline
tables can set this field.
// For more usage of this field, please refer to this design doc:
@@ -158,9 +159,19 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
return Integer.parseInt(_replicasPerPartition);
}
- public String getPeerSegmentDownloadScheme() { return
_peerSegmentDownloadScheme; }
+ public String getPeerSegmentDownloadScheme() {
+ return _peerSegmentDownloadScheme;
+ }
public void setPeerSegmentDownloadScheme(String peerSegmentDownloadScheme) {
_peerSegmentDownloadScheme = peerSegmentDownloadScheme;
}
+
+ public String getCrypterClassName() {
+ return _crypterClassName;
+ }
+
+ public void setCrypterClassName(String crypterClassName) {
+ _crypterClassName = crypterClassName;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java
index 1f5e83a..803147f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java
@@ -19,7 +19,9 @@
package org.apache.pinot.spi.crypt;
import java.io.File;
+import java.io.IOException;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +39,21 @@ public class NoOpPinotCrypter implements PinotCrypter {
@Override
public void encrypt(File decryptedFile, File encryptedFile) {
- return;
+ try {
+ FileUtils.copyFile(decryptedFile, encryptedFile);
+ } catch (IOException e) {
+ LOGGER.warn("Could not encrypt file");
+ FileUtils.deleteQuietly(encryptedFile);
+ }
}
@Override
public void decrypt(File encryptedFile, File decryptedFile) {
- return;
+ try {
+ FileUtils.copyFile(encryptedFile, decryptedFile);
+ } catch (IOException e) {
+ LOGGER.warn("Could not decrypt file");
+ FileUtils.deleteQuietly(decryptedFile);
+ }
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index a81836f..055b22d 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -68,6 +68,7 @@ public class TableConfigBuilder {
private String _peerSegmentDownloadScheme;
private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig;
private CompletionConfig _completionConfig;
+ private String _crypterClassName;
// Tenant config related
private String _brokerTenant;
@@ -173,6 +174,11 @@ public class TableConfigBuilder {
return this;
}
+ public TableConfigBuilder setCrypterClassName(String crypterClassName) {
+ _crypterClassName = crypterClassName;
+ return this;
+ }
+
public TableConfigBuilder setBrokerTenant(String brokerTenant) {
_brokerTenant = brokerTenant;
return this;
@@ -318,6 +324,7 @@ public class TableConfigBuilder {
if (_isLLC) {
validationConfig.setReplicasPerPartition(_numReplicas);
}
+ validationConfig.setCrypterClassName(_crypterClassName);
// Tenant config
TenantConfig tenantConfig = new TenantConfig(_brokerTenant, _serverTenant,
_tagOverrideConfig);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]