xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321467275
 
 

 ##########
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##########
 @@ -354,59 +353,47 @@ public EncryptedOutputFile newOutputFile(PartitionKey 
key) {
     T newOutputFile(PartitionKey key);
   }
 
-  private static class UnpartitionedWriter implements DataWriter<InternalRow> {
-    private static final int ROWS_DIVISOR = 1000;
-
-    private final FileIO fileIo;
-    private FileAppender<InternalRow> currentAppender = null;
-    private final OutputFileFactory<EncryptedOutputFile> fileFactory;
-    private final FileFormat format;
-    private final AppenderFactory<InternalRow> appenderFactory;
-    private EncryptedOutputFile currentFile = null;
-    private final List<DataFile> completedFiles = Lists.newArrayList();
-    private final long targetFileSize;
-    private long currentRows;
-
-    UnpartitionedWriter(
-        OutputFileFactory<EncryptedOutputFile> fileFactory,
-        FileFormat format,
-        AppenderFactory<InternalRow> appenderFactory,
-        FileIO fileIo,
-        long targetFileSize) {
-      this.fileFactory = fileFactory;
+  @SuppressWarnings("checkstyle:VisibilityModifier") // direct access desired 
from sub-classes for performance.
+  private abstract static class BaseWriter implements DataWriter<InternalRow> {
+    protected static final int ROWS_DIVISOR = 1000;
+
+    protected final Set<PartitionKey> completedPartitions = Sets.newHashSet();
+    protected final List<DataFile> completedFiles = Lists.newArrayList();
+    protected final PartitionSpec spec;
+    protected final FileFormat format;
+    protected final AppenderFactory<InternalRow> appenderFactory;
+    protected final OutputFileFactory<EncryptedOutputFile> fileFactory;
+    protected final PartitionKey key;
+    protected final FileIO fileIo;
+    protected final long targetFileSize;
+    protected PartitionKey currentKey = null;
+    protected FileAppender<InternalRow> currentAppender = null;
+    protected EncryptedOutputFile currentFile = null;
+    protected long currentRows;
+
+    BaseWriter(PartitionSpec spec, FileFormat format, 
AppenderFactory<InternalRow> appenderFactory,
+        OutputFileFactory<EncryptedOutputFile> fileFactory, FileIO fileIo, 
long targetFileSize) {
+      this.spec = spec;
       this.format = format;
       this.appenderFactory = appenderFactory;
+      this.fileFactory = fileFactory;
+      this.key = new PartitionKey(spec);
       this.fileIo = fileIo;
       this.targetFileSize = targetFileSize;
-
-      openCurrent();
     }
 
     @Override
-    public void write(InternalRow record) throws IOException {
-      if (currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= 
targetFileSize) {
-        closeCurrent();
-        openCurrent();
-      }
-
-      currentAppender.add(record);
-      currentRows++;
-    }
+    public abstract void write(InternalRow row) throws IOException;
 
     @Override
     public WriterCommitMessage commit() throws IOException {
-      Preconditions.checkArgument(currentAppender != null, "Commit called on a 
closed writer: %s", this);
 
 Review comment:
   These precondition checks on `currentAppender`, which were only done in 
`UnpartitionedWriter` are no longer done in `BaseWriter` since they make 
`PartitionedWriter` fail.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to