mccheah commented on a change in pull request #107: Integrate encryption into
datasource
URL: https://github.com/apache/incubator-iceberg/pull/107#discussion_r258718902
##########
File path: spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
##########
@@ -260,26 +276,51 @@ private Schema lazyExpectedSchema() {
}
private static class TaskDataReader implements
InputPartitionReader<InternalRow> {
+ private final class DecryptedFileScanTask {
+ final FileScanTask fileScanTask;
+ final InputFile inputFile;
+
+ public DecryptedFileScanTask(FileScanTask fileScanTask, InputFile
inputFile) {
+ this.fileScanTask = fileScanTask;
+ this.inputFile = inputFile;
+ }
+ }
+
// for some reason, the apply method can't be called from Java without
reflection
private static final DynMethods.UnboundMethod APPLY_PROJECTION =
DynMethods.builder("apply")
.impl(UnsafeProjection.class, InternalRow.class)
.build();
- private final Iterator<FileScanTask> tasks;
private final Schema tableSchema;
private final Schema expectedSchema;
private final FileIO fileIo;
+ private final Iterator<FileScanTask> tasks;
+ private final Map<String, InputFile> inputFiles;
+
private Iterator<InternalRow> currentIterator = null;
private Closeable currentCloseable = null;
private InternalRow current = null;
- public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema
expectedSchema, FileIO fileIo) {
+ public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema
expectedSchema, FileIO fileIo,
+ EncryptionManager encryptionManager) {
this.fileIo = fileIo;
this.tasks = task.files().iterator();
this.tableSchema = tableSchema;
this.expectedSchema = expectedSchema;
// open last because the schemas and fileIo must be set
+ Iterable<EncryptedInputFile> inputFileIterator = task.files().stream()
+ .map(fileScanTask ->
+ EncryptedFiles.encryptedInput(
+
this.fileIo.newInputFile(fileScanTask.file().path().toString()),
+ fileScanTask.file().keyMetadata()))
+ .collect(Collectors.toList());
+ Iterable<InputFile> inputFiles =
encryptionManager.decrypt(inputFileIterator);
Review comment:
Nit: Some implementations won't want to do batch operations and thus will
only call the iterator and decrypt each file individually. In those cases
there's no use to storing a second list via the `Collectors.toList()` above.
Instead, why not pass `encryptionManager#decrypt` an anonymous function that
generates the iterator from the stream?
```
Iterable<InputFIle> inputFiles = encryptionManager.decrypt(
() -> task.files().stream().map(...).iterator())
```
Leave it up to the implementation if they want to convert the Iterable into
a collection to operate on as a batch, or the implementation can stream through
the values and decrypt individually.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]