rdblue commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321016595
 
 

 ##########
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##########
 @@ -303,66 +305,128 @@ public String toString() {
         }
       }
     }
+
+    private class EncryptedOutputFileFactory implements 
OutputFileFactory<EncryptedOutputFile> {
+      private final int partitionId;
+      private final long taskId;
+      private final long epochId;
+
+      EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+        this.partitionId = partitionId;
+        this.taskId = taskId;
+        this.epochId = epochId;
+      }
+
+      private String generateFilename() {
+        return format.addExtension(String.format("%05d-%d-%s", partitionId, 
taskId, UUID.randomUUID().toString()));
+      }
+
+      /**
+       * Generates EncryptedOutputFile for UnpartitionedWriter.
+       */
+      public EncryptedOutputFile newOutputFile() {
+        OutputFile file = 
fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+        return encryptionManager.encrypt(file);
+      }
+
+      /**
+       * Generates EncryptedOutputFile for PartitionedWriter.
+       */
+      public EncryptedOutputFile newOutputFile(PartitionKey key) {
+        OutputFile rawOutputFile = 
fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+        return encryptionManager.encrypt(rawOutputFile);
+      }
+    }
   }
 
   private interface AppenderFactory<T> {
     FileAppender<T> newAppender(OutputFile file, FileFormat format);
   }
 
-  private static class UnpartitionedWriter implements DataWriter<InternalRow>, 
Closeable {
+  private interface OutputFileFactory<T> {
+    T newOutputFile();
+    T newOutputFile(PartitionKey key);
+  }
+
+  private static class UnpartitionedWriter implements DataWriter<InternalRow> {
     private final FileIO fileIo;
-    private FileAppender<InternalRow> appender = null;
-    private Metrics metrics = null;
-    private List<Long> offsetRanges = null;
-    private final EncryptedOutputFile file;
+    private FileAppender<InternalRow> currentAppender = null;
+    private final OutputFileFactory<EncryptedOutputFile> fileFactory;
+    private final FileFormat format;
+    private final AppenderFactory<InternalRow> appenderFactory;
+    private EncryptedOutputFile currentFile = null;
+    private final List<DataFile> completedFiles = Lists.newArrayList();
+    private final long targetFileSize;
 
     UnpartitionedWriter(
-        EncryptedOutputFile outputFile,
+        OutputFileFactory<EncryptedOutputFile> fileFactory,
         FileFormat format,
-        AppenderFactory<InternalRow> factory,
-        FileIO fileIo) {
+        AppenderFactory<InternalRow> appenderFactory,
+        FileIO fileIo,
+        long targetFileSize) {
+      this.fileFactory = fileFactory;
+      this.format = format;
+      this.appenderFactory = appenderFactory;
       this.fileIo = fileIo;
-      this.file = outputFile;
-      this.appender = factory.newAppender(file.encryptingOutputFile(), format);
+      this.targetFileSize = targetFileSize;
+
+      openCurrent();
     }
 
     @Override
-    public void write(InternalRow record) {
-      appender.add(record);
+    public void write(InternalRow record) throws IOException {
 
 Review comment:
   This is called in a tight loop and `currentAppender.length()` is expensive 
in some cases. Parquet will find the size of the current row group and add that 
to its position in the file. Can we update this to check every 1,000 records or 
so?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to