aokolnychyi commented on a change in pull request #432: Allow writers to 
control size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r322597669
 
 

 ##########
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##########
 @@ -303,96 +304,179 @@ public String toString() {
         }
       }
     }
+
+    private class EncryptedOutputFileFactory implements 
OutputFileFactory<EncryptedOutputFile> {
+      private final int partitionId;
+      private final long taskId;
+      private final long epochId;
+      // The purpose of this uuid is to be able to know from two paths that 
they were written by the same operation.
+      // That's useful, for example, if a Spark job dies and leaves files in 
the file system, you can identify them all
+      // with a recursive listing and grep.
+      private final String uuid = UUID.randomUUID().toString();
+      private int fileCount;
+
+      EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+        this.partitionId = partitionId;
+        this.taskId = taskId;
+        this.epochId = epochId;
+        this.fileCount = 0;
+      }
+
+      private synchronized String generateFilename() {
+        return format.addExtension(String.format("%05d-%d-%s-%05d", 
partitionId, taskId, uuid, fileCount++));
+      }
+
+      /**
+       * Generates EncryptedOutputFile for UnpartitionedWriter.
+       */
+      public EncryptedOutputFile newOutputFile() {
+        OutputFile file = 
fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+        return encryptionManager.encrypt(file);
+      }
+
+      /**
+       * Generates EncryptedOutputFile for PartitionedWriter.
+       */
+      public EncryptedOutputFile newOutputFile(PartitionKey key) {
+        OutputFile rawOutputFile = 
fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+        return encryptionManager.encrypt(rawOutputFile);
+      }
+    }
   }
 
   private interface AppenderFactory<T> {
     FileAppender<T> newAppender(OutputFile file, FileFormat format);
   }
 
-  private static class UnpartitionedWriter implements DataWriter<InternalRow>, 
Closeable {
-    private final FileIO fileIo;
-    private FileAppender<InternalRow> appender = null;
-    private Metrics metrics = null;
-    private List<Long> offsetRanges = null;
-    private final EncryptedOutputFile file;
+  private interface OutputFileFactory<T> {
+    T newOutputFile();
+    T newOutputFile(PartitionKey key);
+  }
 
-    UnpartitionedWriter(
-        EncryptedOutputFile outputFile,
-        FileFormat format,
-        AppenderFactory<InternalRow> factory,
-        FileIO fileIo) {
+  @SuppressWarnings("checkstyle:VisibilityModifier") // direct access desired 
from sub-classes for performance.
+  private abstract static class BaseWriter implements DataWriter<InternalRow> {
+    protected static final int ROWS_DIVISOR = 1000;
+
+    protected final Set<PartitionKey> completedPartitions = Sets.newHashSet();
+    protected final List<DataFile> completedFiles = Lists.newArrayList();
+    protected final PartitionSpec spec;
+    protected final FileFormat format;
+    protected final AppenderFactory<InternalRow> appenderFactory;
+    protected final OutputFileFactory<EncryptedOutputFile> fileFactory;
+    protected final PartitionKey key;
+    protected final FileIO fileIo;
+    protected final long targetFileSize;
+    protected PartitionKey currentKey = null;
+    protected FileAppender<InternalRow> currentAppender = null;
+    protected EncryptedOutputFile currentFile = null;
+    protected long currentRows;
+
+    BaseWriter(PartitionSpec spec, FileFormat format, 
AppenderFactory<InternalRow> appenderFactory,
+        OutputFileFactory<EncryptedOutputFile> fileFactory, FileIO fileIo, 
long targetFileSize) {
 
 Review comment:
   I think the codebase is not consistent. As I recall, we decided not to 
enforce one arg per line, which makes sense to me. I believe we have two 
options right now:
   
   ```
   BaseWriter(PartitionSpec spec, FileFormat format, 
AppenderFactory<InternalRow> appenderFactory,
              OutputFileFactory<EncryptedOutputFile> fileFactory, FileIO 
fileIo, long targetFileSize) {
   ```
   
   or 
   
   ```
   BaseWriter(
     PartitionSpec spec, FileFormat format, AppenderFactory<InternalRow> 
appenderFactory,
     OutputFileFactory<EncryptedOutputFile> fileFactory, FileIO fileIo, long 
targetFileSize) {
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to