mccheah commented on a change in pull request #107: Integrate encryption into
datasource
URL: https://github.com/apache/incubator-iceberg/pull/107#discussion_r258717900
##########
File path: spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
##########
@@ -260,26 +276,51 @@ private Schema lazyExpectedSchema() {
}
private static class TaskDataReader implements
InputPartitionReader<InternalRow> {
+ private final class DecryptedFileScanTask {
+ final FileScanTask fileScanTask;
+ final InputFile inputFile;
+
+ public DecryptedFileScanTask(FileScanTask fileScanTask, InputFile
inputFile) {
+ this.fileScanTask = fileScanTask;
+ this.inputFile = inputFile;
+ }
+ }
+
// for some reason, the apply method can't be called from Java without
reflection
private static final DynMethods.UnboundMethod APPLY_PROJECTION =
DynMethods.builder("apply")
.impl(UnsafeProjection.class, InternalRow.class)
.build();
- private final Iterator<FileScanTask> tasks;
private final Schema tableSchema;
private final Schema expectedSchema;
private final FileIO fileIo;
+ private final Iterator<FileScanTask> tasks;
+ private final Map<String, InputFile> inputFiles;
+
private Iterator<InternalRow> currentIterator = null;
private Closeable currentCloseable = null;
private InternalRow current = null;
- public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema
expectedSchema, FileIO fileIo) {
+ public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema
expectedSchema, FileIO fileIo,
+ EncryptionManager encryptionManager) {
this.fileIo = fileIo;
this.tasks = task.files().iterator();
this.tableSchema = tableSchema;
this.expectedSchema = expectedSchema;
// open last because the schemas and fileIo must be set
+ Iterable<EncryptedInputFile> inputFileIterator = task.files().stream()
+ .map(fileScanTask ->
+ EncryptedFiles.encryptedInput(
+
this.fileIo.newInputFile(fileScanTask.file().path().toString()),
+ fileScanTask.file().keyMetadata()))
+ .collect(Collectors.toList());
+ Iterable<InputFile> inputFiles =
encryptionManager.decrypt(inputFileIterator);
+ this.inputFiles = StreamSupport.stream(inputFiles.spliterator(), false)
+ .collect(Collectors.toMap(
+ inputFile -> inputFile.location(),
+ inputFile -> inputFile));
Review comment:
Nit: Try Guava's `Functions::identity`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]